import socket import json import numpy as np import math import RPi.GPIO as GPIO import time import serial import struct import threading # 用于创建串口接收线程 # GPIO 代码 # 定义继电器引脚 EN_Relay = 17 #预期的帧长度,可根据实际情况修改 EXPECTED_LENGTH = 10 #采样时间、延时时间和衔接机器人速度 sample_time = 100 #50 sleep_time = 0.01 #0.008 #0.005 move_speed = 75 #试运行速度 sample_time_test = 100 #3 sleep_time_test = 0.05 #0.003 move_speed_test = 20 #喷枪开启和关闭位置 turn_on_relay_start = 20 turn_on_relay_end = 25 turn_off_relay_start = 290 turn_off_relay_end = 295 # 定义转换矩阵R_T R_T = np.array([ [-0.7206, -0.6908, 0], [0.6884, -0.7231, 0], [0.0830, 0, 1] ]) def init_gpio(): """初始化GPIO设置""" GPIO.setmode(GPIO.BCM) GPIO.setup(EN_Relay, GPIO.OUT) # 初始化为高电平(关闭状态) GPIO.output(EN_Relay, GPIO.LOW) time.sleep(1) # 等待稳定 def turn_on_relay(): """打开继电器(设置为LOW)""" try: GPIO.output(EN_Relay, GPIO.HIGH) print('Relay turned on (LOW)') except Exception as e: print(f"Error turning on relay: {e}") def turn_off_relay(): """关闭继电器(设置为HIGH)""" try: GPIO.output(EN_Relay, GPIO.LOW) print('Relay turned off (HIGH)') except Exception as e: print(f"Error turning off relay: {e}") # 初始化串口 def serial_init(): """初始化串口""" try: ser = serial.Serial( port="/dev/ttyS0", baudrate=115200, timeout=0.1 # 读取超时时间 ) print("串口初始化成功") return ser except Exception as e: print(f"串口初始化失败: {e}") return None class SerialSharedData: """线程安全的串口数据共享存储类""" def __init__(self): self.lock = threading.Lock() # 防止多线程数据竞争 self.buffer = b'' # 用于解析0xFF坐标帧的通用缓冲区 self.cmd_buffer = b'' # 用于解析0xAA指令帧(主指令/子指令/停止指令)的缓冲区 # 初始化全局共享数据实例 serial_shared = SerialSharedData() #串口接收 def serial_receive_data(ser): """线程内持续接收串口数据,存入共享缓冲区(无返回值)""" while True: try: # 严格检查串口有效性 if not ser or not isinstance(ser, serial.Serial) or not ser.is_open: time.sleep(0.1) continue # 读取所有可用数据 if ser.in_waiting > 0: data = ser.read(ser.in_waiting) with serial_shared.lock: # 数据同时写入两个缓冲区,分别供坐标解析和指令解析使用 serial_shared.buffer += data serial_shared.cmd_buffer += data except Exception as e: print(f"串口接收线程错误: {str(e)}") time.sleep(0.001) # 降低CPU占用率 # 解析以0xFF为帧头的数据包 def parse_frame_threaded(timeout=0.1): """线程安全的0xFF坐标帧解析,从共享缓冲区读取数据""" frame_header = 0xFF frame_length = 7 # 帧结构:0xFF + x(2) + y(2) + z(2) start_time = time.time() while (time.time() - start_time) < timeout: # 加锁读取缓冲区(避免线程阻塞) with serial_shared.lock: current_buf = serial_shared.buffer[:] if len(current_buf) >= frame_length: header_index = current_buf.find(bytes([frame_header])) if header_index != -1 and header_index + frame_length <= len(current_buf): # 提取完整帧并更新共享缓冲区 frame = current_buf[header_index:header_index+frame_length] with serial_shared.lock: serial_shared.buffer = serial_shared.buffer[header_index+frame_length:] # 坐标解析逻辑(与原有完全一致) x_coord = (frame[1] << 8) | frame[2] x_coord = x_coord - 0x10000 if x_coord & 0x8000 else x_coord y_coord = (frame[3] << 8) | frame[4] y_coord = y_coord - 0x10000 if y_coord & 0x8000 else y_coord z_coord = (frame[5] << 8) | frame[6] z_coord = z_coord - 0x10000 if z_coord & 0x8000 else z_coord print(f"解析成功 - 帧头: 0x{frame_header:02X}, " f"x: 0x{frame[1]:02X}{frame[2]:02X}={x_coord}, " f"y: 0x{frame[3]:02X}{frame[4]:02X}={y_coord}, " f"z: 0x{frame[5]:02X}{frame[6]:02X}={z_coord}") return (True, x_coord, y_coord, z_coord) time.sleep(0.01) # 超时处理(与原有一致) print("解析0xFF帧头数据超时,将使用上次的补偿值继续执行") return (False, 0, 0, 0) def read_cmd_from_shared(timeout=1.0): frame_header = 0xAA cmd_length = 2 # 指令帧结构:0xAA(帧头) + 1字节指令值 start_time = time.time() while (time.time() - start_time) < timeout: # 加锁读取指令缓冲区 with serial_shared.lock: current_cmd_buf = serial_shared.cmd_buffer[:] if len(current_cmd_buf) >= cmd_length: header_index = current_cmd_buf.find(bytes([frame_header])) if header_index != -1: # 提取完整指令帧 cmd_bytes = current_cmd_buf[header_index:header_index+cmd_length] # 更新共享缓冲区(移除已处理指令) with serial_shared.lock: serial_shared.cmd_buffer = serial_shared.cmd_buffer[header_index+cmd_length:] return cmd_bytes time.sleep(0.01) return None # 超时返回None # 矩阵转换函数:将P_L1_O2通过R_T转换为P_L1_O1 def transform_data(x_coord, y_coord, z_coord): # 将接收到的3个坐标组成列矩阵P_L1_O2 P_L1_O2 = np.array([[x_coord], [y_coord], [z_coord]], dtype=np.float64) # 执行矩阵乘法:P_L1_O1 = R_T * P_L1_O2 P_L1_O1 = np.dot(R_T, P_L1_O2) # 提取转换后的三个数据 increments_x = P_L1_O1[0][0] increments_y = P_L1_O1[1][0] increments_z = P_L1_O1[2][0] print(f"矩阵转换完成 - increments_x: {increments_x:.4f}, increments_y: {increments_y:.4f}, increments_z: {increments_z:.4f}") return increments_x, increments_y, increments_z # 连接机器人控制器 def connectETController(ip, port=8055): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: sock.connect((ip, port)) return (True, sock) except Exception as e: sock.close() return (False,) # 发送命令并接收响应 def sendCMD(sock, cmd, params=None, id=1): if not params: params = [] else: params = json.dumps(params) sendStr = '{"method":"%s","params":%s,"jsonrpc":"2.0","id":%d}' % (cmd, params, id) + "\n" try: sock.sendall(bytes(sendStr, "utf-8")) ret = sock.recv(1024) jdata = json.loads(str(ret, "utf-8")) if "result" in jdata.keys(): return (True, json.loads(jdata["result"]), jdata["id"]) elif "error" in jdata.keys(): return (False, jdata["error"], jdata["id"]) else: return (False, None, None) except Exception as e: return (False, None, None) def send_Point(sock, cmd, params=None, id=1): if (not params): params = [] else: params = json.dumps(params) sendStr = "{{\"method\":\"{0}\",\"params\":{1},\"jsonrpc\":\"2.0\",\"id\":{2}}}".format(cmd, params, id) + "\n" sock.sendall(bytes(sendStr, "utf-8")) def wait_stop(sock): # 修正:添加sock参数 while True: time.sleep(0.1) ret1, result1, id1 = sendCMD(sock, "getRobotState") # getRobotstate if (ret1): if result1 == 0 or result1 == 4: break else: print("getRobotState failed") break #需要做8端程序, # 第1段,起点处的关节位置(txt);第2段,上升喷涂(txt); # 第3段,起点初的关节位置(txt);第4段,上升喷涂(txt); # 第5段,起点初的关节位置(txt);第6段,上升喷涂(txt); # 第7段,起点初的关节位置(txt);第8段,上升喷涂(txt); # 第一段程序封装为函数 def program1(sock): start_time = time.time() # 返回当前时间的时间戳(秒) i = 0 turn_off_relay() print("关闭喷枪")#关闭喷枪 ###大臂车补偿量 # 定义大臂车的位姿补偿量,前3个值为空间位置补偿,后3个为姿态补偿(此处姿态不补偿,均为0) increments = [increments_x, increments_y, increments_z, 0, 0, 0] # 初始化参考关节位置变量,用于存储第一组关节位置 reference_pos = None # 初始化新关节位置变量,用于存储逆解计算得到的关节位置 new_joint_pos = None # 打开Pose_1.txt文件,以只读模式获取第一组末端位姿并计算目标位姿 pose_file = open('/home/raspberrypi/robot1/Pose_1.txt', "r") # 读取文件第一行内容,并去除首尾的空白字符(包括换行符) first_pose_line = pose_file.readline().strip() # 读取第一组位姿 # 判断是否成功读取到第一行位姿数据 if first_pose_line: # 将读取到的字符串按逗号分割,并转换为浮点数列表,得到原始路径的第一个位置 Path_First_Position = list(map(float, first_pose_line.split(','))) # 将原始位置与补偿量逐元素相加,得到修正后的第一个位置 Modified_First_Position = [val + inc for val, inc in zip(Path_First_Position, increments)] # 将修正后的位置赋值给target_pose,作为后续逆解计算的目标位姿 target_pose = Modified_First_Position else: print("Pose_1.txt为空,无法获取初始位姿") pose_file.close() return pose_file.close() file_name = '/home/raspberrypi/robot1/joint_positions_assembled_1.txt' # 打开关节位置文件,以只读模式读取内容 fo = open(file_name, "r") # 循环读取文件中的每一行内容 while 1: # 读取一行内容 line = fo.readline() # 如果读取到空行(文件结束),则跳出循环 if not line: break # 去除行首尾的空白字符 line_list = line.strip() # 将字符串按逗号分割,并转换为浮点数列表,得到当前行的关节位置数据 line_list = list(map(float, line_list.split(','))) # 判断是否是第一组关节位置(reference_pos尚未赋值) if reference_pos is None: # 将第一组关节位置赋值给reference_pos,作为参考位置 reference_pos = line_list # 调用逆解函数计算新的关节位置:发送"inverseKinematic"命令,参数为目标位姿和参考位置 suc, angle, id = sendCMD(sock, "inverseKinematic", {"targetPose": target_pose, "referencePos": reference_pos}) # 判断逆解是否成功且返回了有效的关节角度 if suc and angle is not None: # 将逆解得到的关节位置保存到new_joint_pos new_joint_pos = angle # 保存逆解得到的新关节位置 else: # 如果逆解失败,打印提示信息 print("逆解计算失败,使用原有关节位置") # 使用原始关节位置作为新关节位置 new_joint_pos = line_list # 失败时使用原位置 # 确定当前行的修改后关节位置:优先使用逆解得到的新位置,若不存在则使用原始位置 modified_list = new_joint_pos if new_joint_pos is not None else line_list # 打印当前行的修改后关节位置 #print(modified_list) # 判断是否是第一行关节位置(i=0表示首次处理) if (i == 0): # 发送关节运动命令,将机器人移动到起始点,速度设置为100 suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": 100}) # 打印运动命令的执行结果 #print(result) # 等待机器人停止运动 wait_stop(sock) # 初始化透传服务:设置前瞻距离400、时间5、平滑度1、禁用响应 suc, result, id = sendCMD(sock, "transparent_transmission_init", { "lookahead": 400, "t": sample_time, "smoothness": 1, "response_enable": 0}) #print(result) # 发送当前关节点信息到透传缓存,用于后续的伺服控制 send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPos": modified_list}) #print(result) # 暂停0.01秒,控制数据发送频率 time.sleep(sleep_time) # 循环计数器加1,记录已处理的行数 i = i + 1 fo.close() end_time = time.time() print(f"程序执行时间:{end_time - start_time:.6f} 秒") print("第一段程序执行完") # 第二段程序封装为函数 def program2(sock): start_time = time.time() # 返回当前时间的时间戳(秒) i = 0 # 清空透传缓存 suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf") time.sleep(0) print("第二段程序开始执行") file_name = '/home/raspberrypi/robot1/Pose_1.txt' fo = open(file_name, "r") while 1: line = fo.readline() if not line: break line_list = line.strip() line_list = list(map(float, line_list.split(','))) increments = [increments_x, increments_y, increments_z, 0, 0, 0] modified_list = [val + inc for val, inc in zip(line_list, increments)] #print(modified_list) if (i == 0): # 关节运动到起始点 suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": 100}) #print(result) wait_stop(sock) #print(1) # 初始化透传服务 suc, result, id = sendCMD(sock, "transparent_transmission_init", { "lookahead": 100, "t": sample_time, "smoothness": 1, "response_enable": 0}) #print(result) # 添加透传伺服目标关节点信息到缓存中 send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": modified_list}) #print(result) time.sleep(sleep_time) if(i >= turn_on_relay_start and i <= turn_on_relay_end): turn_on_relay() print("打开喷枪")#开喷枪 if(i >= turn_off_relay_start and i <= turn_off_relay_end): turn_off_relay() print("关闭喷枪")#关喷枪 i = i + 1 fo.close() end_time = time.time() print(f"程序执行时间:{end_time - start_time:.6f} 秒") print("第二段程序执行完") # 第三段程序封装为函数,hjb改 def program3(sock): # 记录开始时间 start_time = time.time() i = 0 turn_off_relay() print("关闭喷枪") increments = [increments_x, increments_y, increments_z, 0, 0, 0] reference_pos = None new_joint_pos = None pose_file = open('/home/raspberrypi/robot1/Pose_2.txt', "r") first_pose_line = pose_file.readline().strip() if first_pose_line: Path_First_Position = list(map(float, first_pose_line.split(','))) Modified_First_Position = [val + inc for val, inc in zip(Path_First_Position, increments)] target_pose = Modified_First_Position else: print("Pose_2.txt为空,无法获取初始位姿") pose_file.close() return pose_file.close() file_name = '/home/raspberrypi/robot1/joint_positions_assembled_2.txt' fo = open(file_name, "r") while 1: line = fo.readline() if not line: break line_list = line.strip() line_list = list(map(float, line_list.split(','))) if reference_pos is None: reference_pos = line_list suc, angle, id = sendCMD(sock, "inverseKinematic", {"targetPose": target_pose, "referencePos": reference_pos}) if suc and angle is not None: new_joint_pos = angle else: print("逆解计算失败,使用原有关节位置") new_joint_pos = line_list modified_list = new_joint_pos if new_joint_pos is not None else line_list #print(modified_list) if (i == 0): suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": move_speed}) #print(result) wait_stop(sock) suc, result, id = sendCMD(sock, "transparent_transmission_init", { "lookahead": 100, "t": sample_time, "smoothness": 1, "response_enable": 0}) #print(result) send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPos": modified_list}) #print(result) time.sleep(sleep_time) i = i + 1 fo.close() end_time = time.time() print(f"程序执行时间:{end_time - start_time:.6f} 秒") print("第三段程序执行完") # 第四段程序封装为函数 def program4(sock): start_time = time.time() # 返回当前时间的时间戳(秒) i = 0 # 清空透传缓存 suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf") time.sleep(0) print("第四段程序开始执行") file_name = '/home/raspberrypi/robot1/Pose_2.txt' fo = open(file_name, "r") while 1: line = fo.readline() if not line: break line_list = line.strip() line_list = list(map(float, line_list.split(','))) increments = [increments_x, increments_y, increments_z, 0, 0, 0] modified_list = [val + inc for val, inc in zip(line_list, increments)] #print(modified_list) if (i == 0): # 关节运动到起始点 suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": 100}) #print(result) wait_stop(sock) # 等待机器人停止 #print(1) # 初始化透传服务 suc, result, id = sendCMD(sock, "transparent_transmission_init", { "lookahead": 100, "t": sample_time, "smoothness": 1, "response_enable": 0}) #print(result) # 添加透传伺服目标关节点信息到缓存中 send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": modified_list}) #print(result) time.sleep(sleep_time) if(i >= turn_on_relay_start and i <= turn_on_relay_end): turn_on_relay() print("打开喷枪")#开喷枪 if(i >= turn_off_relay_start and i <= turn_off_relay_end): turn_off_relay() print("关闭喷枪")#关喷枪 i = i + 1 fo.close() end_time = time.time() print(f"程序执行时间:{end_time - start_time:.6f} 秒") print("第四段程序执行完") # 第五段程序封装为函数,hjb改 def program5(sock): start_time = time.time() # 返回当前时间的时间戳(秒) # 初始化循环计数器i,用于记录处理的关节位置行数 i = 0 turn_off_relay() print("关闭喷枪")#关闭喷枪 ###大臂车补偿量 # 定义大臂车的位姿补偿量,前3个值为空间位置补偿,后3个为姿态补偿(此处姿态不补偿,均为0) increments = [increments_x, increments_y, increments_z, 0, 0, 0] # 大臂车的位姿误差只补偿空间位置不补偿姿态,所有后面3个都是0; ###大臂车补偿量 # 初始化参考关节位置变量,用于存储第一组关节位置 reference_pos = None # 初始化参考位置 # 初始化新关节位置变量,用于存储逆解计算得到的关节位置 new_joint_pos = None # 初始化逆解得到的新关节位置 # 打开Pose_1.txt文件,以只读模式获取第一组末端位姿并计算目标位姿 pose_file = open('/home/raspberrypi/robot1/Pose_3.txt', "r") # 读取文件第一行内容,并去除首尾的空白字符(包括换行符) first_pose_line = pose_file.readline().strip() # 读取第一组位姿 # 判断是否成功读取到第一行位姿数据 if first_pose_line: # 将读取到的字符串按逗号分割,并转换为浮点数列表,得到原始路径的第一个位置 Path_First_Position = list(map(float, first_pose_line.split(','))) # 将原始位置与补偿量逐元素相加,得到修正后的第一个位置 Modified_First_Position = [val + inc for val, inc in zip(Path_First_Position, increments)] # 将修正后的位置赋值给target_pose,作为后续逆解计算的目标位姿 target_pose = Modified_First_Position # 将修改后的位姿列表 Modified_First_Position 赋值给 target_pose,作为目标位姿使用 else: # 如果文件为空,打印错误信息 print("Pose_3.txt为空,无法获取初始位姿") # 关闭文件 pose_file.close() # 退出函数 return # 关闭Pose_1.txt文件 pose_file.close() # 定义关节位置文件的文件名 file_name = '/home/raspberrypi/robot1/joint_positions_assembled_3.txt' # 打开关节位置文件,以只读模式读取内容 fo = open(file_name, "r") # 循环读取文件中的每一行内容 while 1: # 读取一行内容 line = fo.readline() # 如果读取到空行(文件结束),则跳出循环 if not line: break # 去除行首尾的空白字符 line_list = line.strip() # 将字符串按逗号分割,并转换为浮点数列表,得到当前行的关节位置数据 line_list = list(map(float, line_list.split(','))) # 判断是否是第一组关节位置(reference_pos尚未赋值) if reference_pos is None: # 将第一组关节位置赋值给reference_pos,作为参考位置 reference_pos = line_list # A组关节位置赋值给参考位置 # 调用逆解函数计算新的关节位置:发送"inverseKinematic"命令,参数为目标位姿和参考位置 suc, angle, id = sendCMD(sock, "inverseKinematic", {"targetPose": target_pose, "referencePos": reference_pos}) # 判断逆解是否成功且返回了有效的关节角度 if suc and angle is not None: # 将逆解得到的关节位置保存到new_joint_pos new_joint_pos = angle # 保存逆解得到的新关节位置 else: # 如果逆解失败,打印提示信息 print("逆解计算失败,使用原有关节位置") # 使用原始关节位置作为新关节位置 new_joint_pos = line_list # 失败时使用原位置 # 确定当前行的修改后关节位置:优先使用逆解得到的新位置,若不存在则使用原始位置 modified_list = new_joint_pos if new_joint_pos is not None else line_list # 打印当前行的修改后关节位置 #print(modified_list) # 判断是否是第一行关节位置(i=0表示首次处理) if (i == 0): # 发送关节运动命令,将机器人移动到起始点,速度设置为100 suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": 100}) # 打印运动命令的执行结果 # print(result) # 等待机器人停止运动 wait_stop(sock) # 等待机器人停止 # 打印提示信息,标识已完成起始点运动 #print(1) # 初始化透传服务:设置前瞻距离400、时间5、平滑度1、禁用响应 suc, result, id = sendCMD(sock, "transparent_transmission_init", { "lookahead": 400, "t": sample_time, "smoothness": 1, "response_enable": 0}) # 打印透传初始化的结果 #print(result) # 发送当前关节点信息到透传缓存,用于后续的伺服控制 send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPos": modified_list}) # 打印上一条命令的执行结果(注:此处result可能还是透传初始化的结果,需注意是否需要更新) #print(result) # 暂停0.01秒,控制数据发送频率 time.sleep(sleep_time) # 循环计数器加1,记录已处理的行数 i = i + 1 # 关闭关节位置文件 fo.close() end_time = time.time() print(f"程序执行时间:{end_time - start_time:.6f} 秒") # 打印提示信息,标识第三段程序执行完毕 print("第五段程序执行完") # 第六段程序封装为函数 def program6(sock): start_time = time.time() # 返回当前时间的时间戳(秒) i = 0 # 清空透传缓存 suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf") time.sleep(0) print("第六段程序开始执行") file_name = '/home/raspberrypi/robot1/Pose_3.txt' fo = open(file_name, "r") while 1: line = fo.readline() if not line: break line_list = line.strip() line_list = list(map(float, line_list.split(','))) increments = [increments_x, increments_y, increments_z, 0, 0, 0] modified_list = [val + inc for val, inc in zip(line_list, increments)] #print(modified_list) if (i == 0): # 关节运动到起始点 suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": 100}) #print(result) wait_stop(sock) # 等待机器人停止 #print(1) # 初始化透传服务 suc, result, id = sendCMD(sock, "transparent_transmission_init", { "lookahead": 100, "t": sample_time, "smoothness": 1, "response_enable": 0}) #print(result) # 添加透传伺服目标关节点信息到缓存中 send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": modified_list}) #print(result) time.sleep(sleep_time) if(i >= turn_on_relay_start and i <= turn_on_relay_end): turn_on_relay() print("打开喷枪")#开喷枪 if(i >= turn_off_relay_start and i <= turn_off_relay_end): turn_off_relay() print("关闭喷枪")#关喷枪 i = i + 1 fo.close() end_time = time.time() print(f"程序执行时间:{end_time - start_time:.6f} 秒") print("第六段程序执行完") # 第七段程序封装为函数,hjb改 def program7(sock): start_time = time.time() # 返回当前时间的时间戳(秒) # 初始化循环计数器i,用于记录处理的关节位置行数 i = 0 turn_off_relay() print("关闭喷枪")#关闭喷枪 ###大臂车补偿量 # 定义大臂车的位姿补偿量,前3个值为空间位置补偿,后3个为姿态补偿(此处姿态不补偿,均为0) increments = [increments_x, increments_y, increments_z, 0, 0, 0] # 大臂车的位姿误差只补偿空间位置不补偿姿态,所有后面3个都是0; ###大臂车补偿量 # 初始化参考关节位置变量,用于存储第一组关节位置 reference_pos = None # 初始化参考位置 # 初始化新关节位置变量,用于存储逆解计算得到的关节位置 new_joint_pos = None # 初始化逆解得到的新关节位置 # 打开Pose_1.txt文件,以只读模式获取第一组末端位姿并计算目标位姿 pose_file = open('/home/raspberrypi/robot1/Pose_4.txt', "r") # 读取文件第一行内容,并去除首尾的空白字符(包括换行符) first_pose_line = pose_file.readline().strip() # 读取第一组位姿 # 判断是否成功读取到第一行位姿数据 if first_pose_line: # 将读取到的字符串按逗号分割,并转换为浮点数列表,得到原始路径的第一个位置 Path_First_Position = list(map(float, first_pose_line.split(','))) # 将原始位置与补偿量逐元素相加,得到修正后的第一个位置 Modified_First_Position = [val + inc for val, inc in zip(Path_First_Position, increments)] # 将修正后的位置赋值给target_pose,作为后续逆解计算的目标位姿 target_pose = Modified_First_Position # 将修改后的位姿列表 Modified_First_Position 赋值给 target_pose,作为目标位姿使用 else: # 如果文件为空,打印错误信息 print("Pose_4.txt为空,无法获取初始位姿") # 关闭文件 pose_file.close() # 退出函数 return # 关闭Pose_1.txt文件 pose_file.close() # 定义关节位置文件的文件名 file_name = '/home/raspberrypi/robot1/joint_positions_assembled_4.txt' # 打开关节位置文件,以只读模式读取内容 fo = open(file_name, "r") # 循环读取文件中的每一行内容 while 1: # 读取一行内容 line = fo.readline() # 如果读取到空行(文件结束),则跳出循环 if not line: break # 去除行首尾的空白字符 line_list = line.strip() # 将字符串按逗号分割,并转换为浮点数列表,得到当前行的关节位置数据 line_list = list(map(float, line_list.split(','))) # 判断是否是第一组关节位置(reference_pos尚未赋值) if reference_pos is None: # 将第一组关节位置赋值给reference_pos,作为参考位置 reference_pos = line_list # A组关节位置赋值给参考位置 # 调用逆解函数计算新的关节位置:发送"inverseKinematic"命令,参数为目标位姿和参考位置 suc, angle, id = sendCMD(sock, "inverseKinematic", {"targetPose": target_pose, "referencePos": reference_pos}) # 判断逆解是否成功且返回了有效的关节角度 if suc and angle is not None: # 将逆解得到的关节位置保存到new_joint_pos new_joint_pos = angle # 保存逆解得到的新关节位置 else: # 如果逆解失败,打印提示信息 print("逆解计算失败,使用原有关节位置") # 使用原始关节位置作为新关节位置 new_joint_pos = line_list # 失败时使用原位置 # 确定当前行的修改后关节位置:优先使用逆解得到的新位置,若不存在则使用原始位置 modified_list = new_joint_pos if new_joint_pos is not None else line_list # 打印当前行的修改后关节位置 #print(modified_list) # 判断是否是第一行关节位置(i=0表示首次处理) if (i == 0): # 发送关节运动命令,将机器人移动到起始点,速度设置为100 suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": 100}) # 打印运动命令的执行结果 #print(result) # 等待机器人停止运动 wait_stop(sock) # 等待机器人停止 # 打印提示信息,标识已完成起始点运动 #print(1) # 初始化透传服务:设置前瞻距离400、时间5、平滑度1、禁用响应 suc, result, id = sendCMD(sock, "transparent_transmission_init", { "lookahead": 400, "t": sample_time, "smoothness": 1, "response_enable": 0}) # 打印透传初始化的结果 #print(result) # 发送当前关节点信息到透传缓存,用于后续的伺服控制 send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPos": modified_list}) # 打印上一条命令的执行结果(注:此处result可能还是透传初始化的结果,需注意是否需要更新) #print(result) # 暂停0.01秒,控制数据发送频率 time.sleep(sleep_time) # 循环计数器加1,记录已处理的行数 i = i + 1 # 关闭关节位置文件 fo.close() end_time = time.time() print(f"程序执行时间:{end_time - start_time:.6f} 秒") # 打印提示信息,标识第三段程序执行完毕 print("第七段程序执行完") # 第八段程序封装为函数 def program8(sock): start_time = time.time() # 返回当前时间的时间戳(秒) i = 0 # 清空透传缓存 suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf") time.sleep(0) print("第八段程序开始执行") file_name = '/home/raspberrypi/robot1/Pose_4.txt' fo = open(file_name, "r") while 1: line = fo.readline() if not line: break line_list = line.strip() line_list = list(map(float, line_list.split(','))) increments = [increments_x, increments_y, increments_z, 0, 0, 0] modified_list = [val + inc for val, inc in zip(line_list, increments)] #print(modified_list) if (i == 0): # 关节运动到起始点 suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": 100}) #print(result) wait_stop(sock) # 等待机器人停止 #print(1) # 初始化透传服务 suc, result, id = sendCMD(sock, "transparent_transmission_init", { "lookahead": 100, "t": sample_time, "smoothness": 1, "response_enable": 0}) #print(result) # 添加透传伺服目标关节点信息到缓存中 send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": modified_list}) #print(result) time.sleep(sleep_time) if(i >= turn_on_relay_start and i <= turn_on_relay_end): turn_on_relay() print("打开喷枪")#开喷枪 if(i >= turn_off_relay_start and i <= turn_off_relay_end): turn_off_relay() print("关闭喷枪")#关喷枪 i = i + 1 fo.close() end_time = time.time() print(f"程序执行时间:{end_time - start_time:.6f} 秒") print("第八段程序执行完") # 第九段程序封装为函数,hjb改 def program9(sock): start_time = time.time() # 返回当前时间的时间戳(秒) # 初始化循环计数器i,用于记录处理的关节位置行数 i = 0 turn_off_relay() ###大臂车补偿量 # 定义大臂车的位姿补偿量,前3个值为空间位置补偿,后3个为姿态补偿(此处姿态不补偿,均为0) increments = [0, 0, 0, 0, 0, 0] # 大臂车的位姿误差只补偿空间位置不补偿姿态,所有后面3个都是0; ###大臂车补偿量 # 初始化参考关节位置变量,用于存储第一组关节位置 reference_pos = None # 初始化参考位置 # 初始化新关节位置变量,用于存储逆解计算得到的关节位置 new_joint_pos = None # 初始化逆解得到的新关节位置 # 打开Pose_1.txt文件,以只读模式获取第一组末端位姿并计算目标位姿 pose_file = open('/home/raspberrypi/robot1/Pose_5.txt', "r") # 读取文件第一行内容,并去除首尾的空白字符(包括换行符) first_pose_line = pose_file.readline().strip() # 读取第一组位姿 # 判断是否成功读取到第一行位姿数据 if first_pose_line: # 将读取到的字符串按逗号分割,并转换为浮点数列表,得到原始路径的第一个位置 Path_First_Position = list(map(float, first_pose_line.split(','))) # 将原始位置与补偿量逐元素相加,得到修正后的第一个位置 Modified_First_Position = [val + inc for val, inc in zip(Path_First_Position, increments)] # 将修正后的位置赋值给target_pose,作为后续逆解计算的目标位姿 target_pose = Modified_First_Position # 将修改后的位姿列表 Modified_First_Position 赋值给 target_pose,作为目标位姿使用 else: # 如果文件为空,打印错误信息 print("Pose_5.txt为空,无法获取初始位姿") # 关闭文件 pose_file.close() # 退出函数 return # 关闭Pose_1.txt文件 pose_file.close() # 定义关节位置文件的文件名 file_name = '/home/raspberrypi/robot1/joint_positions_assembled_5.txt' # 打开关节位置文件,以只读模式读取内容 fo = open(file_name, "r") # 循环读取文件中的每一行内容 while 1: # 读取一行内容 line = fo.readline() # 如果读取到空行(文件结束),则跳出循环 if not line: break # 去除行首尾的空白字符 line_list = line.strip() # 将字符串按逗号分割,并转换为浮点数列表,得到当前行的关节位置数据 line_list = list(map(float, line_list.split(','))) # 判断是否是第一组关节位置(reference_pos尚未赋值) if reference_pos is None: # 将第一组关节位置赋值给reference_pos,作为参考位置 reference_pos = line_list # A组关节位置赋值给参考位置 # 调用逆解函数计算新的关节位置:发送"inverseKinematic"命令,参数为目标位姿和参考位置 suc, angle, id = sendCMD(sock, "inverseKinematic", {"targetPose": target_pose, "referencePos": reference_pos}) # 判断逆解是否成功且返回了有效的关节角度 if suc and angle is not None: # 将逆解得到的关节位置保存到new_joint_pos new_joint_pos = angle # 保存逆解得到的新关节位置 else: # 如果逆解失败,打印提示信息 print("逆解计算失败,使用原有关节位置") # 使用原始关节位置作为新关节位置 new_joint_pos = line_list # 失败时使用原位置 # 确定当前行的修改后关节位置:优先使用逆解得到的新位置,若不存在则使用原始位置 modified_list = new_joint_pos if new_joint_pos is not None else line_list # 打印当前行的修改后关节位置 #print(modified_list) # 判断是否是第一行关节位置(i=0表示首次处理) if (i == 0): # 发送关节运动命令,将机器人移动到起始点,速度设置为100 suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": 100}) # 打印运动命令的执行结果 #print(result) # 等待机器人停止运动 wait_stop(sock) # 等待机器人停止 # 打印提示信息,标识已完成起始点运动 #print(1) # 初始化透传服务:设置前瞻距离400、时间5、平滑度1、禁用响应 suc, result, id = sendCMD(sock, "transparent_transmission_init", { "lookahead": 400, "t": 5, "smoothness": 1, "response_enable": 0}) # 打印透传初始化的结果 #print(result) # 发送当前关节点信息到透传缓存,用于后续的伺服控制 send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPos": modified_list}) # 打印上一条命令的执行结果(注:此处result可能还是透传初始化的结果,需注意是否需要更新) #print(result) # 暂停0.01秒,控制数据发送频率 time.sleep(0.01) # 循环计数器加1,记录已处理的行数 i = i + 1 # 关闭关节位置文件 fo.close() end_time = time.time() print(f"程序执行时间:{end_time - start_time:.6f} 秒") # 打印提示信息,标识第三段程序执行完毕 print("第九段程序执行完") def program10(sock): start_time = time.time() # 返回当前时间的时间戳(秒) # 初始化循环计数器i,用于记录处理的关节位置行数 i = 0 ###大臂车补偿量 # 定义大臂车的位姿补偿量,前3个值为空间位置补偿,后3个为姿态补偿(此处姿态不补偿,均为0) increments = [0, 0, 0, 0, 0, 0] # 大臂车的位姿误差只补偿空间位置不补偿姿态,所有后面3个都是0; ###大臂车补偿量 # 初始化参考关节位置变量,用于存储第一组关节位置 reference_pos = None # 初始化参考位置 # 初始化新关节位置变量,用于存储逆解计算得到的关节位置 new_joint_pos = None # 初始化逆解得到的新关节位置 # 打开Pose_1.txt文件,以只读模式获取第一组末端位姿并计算目标位姿 pose_file = open('/home/raspberrypi/robot1/Pose_6.txt', "r") # 读取文件第一行内容,并去除首尾的空白字符(包括换行符) first_pose_line = pose_file.readline().strip() # 读取第一组位姿 # 判断是否成功读取到第一行位姿数据 if first_pose_line: # 将读取到的字符串按逗号分割,并转换为浮点数列表,得到原始路径的第一个位置 Path_First_Position = list(map(float, first_pose_line.split(','))) # 将原始位置与补偿量逐元素相加,得到修正后的第一个位置 Modified_First_Position = [val + inc for val, inc in zip(Path_First_Position, increments)] # 将修正后的位置赋值给target_pose,作为后续逆解计算的目标位姿 target_pose = Modified_First_Position # 将修改后的位姿列表 Modified_First_Position 赋值给 target_pose,作为目标位姿使用 else: # 如果文件为空,打印错误信息 print("Pose_6.txt为空,无法获取初始位姿") # 关闭文件 pose_file.close() # 退出函数 return # 关闭Pose_1.txt文件 pose_file.close() # 定义关节位置文件的文件名 file_name = '/home/raspberrypi/robot1/joint_positions_assembled_6.txt' # 打开关节位置文件,以只读模式读取内容 fo = open(file_name, "r") # 循环读取文件中的每一行内容 while 1: # 读取一行内容 line = fo.readline() # 如果读取到空行(文件结束),则跳出循环 if not line: break # 去除行首尾的空白字符 line_list = line.strip() # 将字符串按逗号分割,并转换为浮点数列表,得到当前行的关节位置数据 line_list = list(map(float, line_list.split(','))) # 判断是否是第一组关节位置(reference_pos尚未赋值) if reference_pos is None: # 将第一组关节位置赋值给reference_pos,作为参考位置 reference_pos = line_list # A组关节位置赋值给参考位置 # 调用逆解函数计算新的关节位置:发送"inverseKinematic"命令,参数为目标位姿和参考位置 suc, angle, id = sendCMD(sock, "inverseKinematic", {"targetPose": target_pose, "referencePos": reference_pos}) # 判断逆解是否成功且返回了有效的关节角度 if suc and angle is not None: # 将逆解得到的关节位置保存到new_joint_pos new_joint_pos = angle # 保存逆解得到的新关节位置 else: # 如果逆解失败,打印提示信息 print("逆解计算失败,使用原有关节位置") # 使用原始关节位置作为新关节位置 new_joint_pos = line_list # 失败时使用原位置 # 确定当前行的修改后关节位置:优先使用逆解得到的新位置,若不存在则使用原始位置 modified_list = new_joint_pos if new_joint_pos is not None else line_list # 打印当前行的修改后关节位置 #print(modified_list) # 判断是否是第一行关节位置(i=0表示首次处理) if (i == 0): # 发送关节运动命令,将机器人移动到起始点,速度设置为100 suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": 100}) # 打印运动命令的执行结果 #print(result) # 等待机器人停止运动 wait_stop(sock) # 等待机器人停止 # 打印提示信息,标识已完成起始点运动 #print(1) # 初始化透传服务:设置前瞻距离400、时间5、平滑度1、禁用响应 suc, result, id = sendCMD(sock, "transparent_transmission_init", { "lookahead": 400, "t": 5, "smoothness": 1, "response_enable": 0}) # 打印透传初始化的结果 #print(result) # 发送当前关节点信息到透传缓存,用于后续的伺服控制 send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPos": modified_list}) # 打印上一条命令的执行结果(注:此处result可能还是透传初始化的结果,需注意是否需要更新) #print(result) # 暂停0.01秒,控制数据发送频率 time.sleep(0.01) # 循环计数器加1,记录已处理的行数 i = i + 1 # 关闭关节位置文件 fo.close() end_time = time.time() print(f"程序执行时间:{end_time - start_time:.6f} 秒") # 打印提示信息,标识第三段程序执行完毕 print("第十段程序执行完") def program11(sock): start_time = time.time() # 返回当前时间的时间戳(秒) # 初始化循环计数器i,用于记录处理的关节位置行数 i = 0 ###大臂车补偿量 # 定义大臂车的位姿补偿量,前3个值为空间位置补偿,后3个为姿态补偿(此处姿态不补偿,均为0) increments = [0, 0, 0, 0, 0, 0] # 大臂车的位姿误差只补偿空间位置不补偿姿态,所有后面3个都是0; ###大臂车补偿量 # 初始化参考关节位置变量,用于存储第一组关节位置 reference_pos = None # 初始化参考位置 # 初始化新关节位置变量,用于存储逆解计算得到的关节位置 new_joint_pos = None # 初始化逆解得到的新关节位置 # 打开Pose_1.txt文件,以只读模式获取第一组末端位姿并计算目标位姿 pose_file = open('/home/raspberrypi/robot1/Pose_7.txt', "r") # 读取文件第一行内容,并去除首尾的空白字符(包括换行符) first_pose_line = pose_file.readline().strip() # 读取第一组位姿 # 判断是否成功读取到第一行位姿数据 if first_pose_line: # 将读取到的字符串按逗号分割,并转换为浮点数列表,得到原始路径的第一个位置 Path_First_Position = list(map(float, first_pose_line.split(','))) # 将原始位置与补偿量逐元素相加,得到修正后的第一个位置 Modified_First_Position = [val + inc for val, inc in zip(Path_First_Position, increments)] # 将修正后的位置赋值给target_pose,作为后续逆解计算的目标位姿 target_pose = Modified_First_Position # 将修改后的位姿列表 Modified_First_Position 赋值给 target_pose,作为目标位姿使用 else: # 如果文件为空,打印错误信息 print("Pose_7.txt为空,无法获取初始位姿") # 关闭文件 pose_file.close() # 退出函数 return # 关闭Pose_1.txt文件 pose_file.close() # 定义关节位置文件的文件名 file_name = '/home/raspberrypi/robot1/joint_positions_assembled_7.txt' # 打开关节位置文件,以只读模式读取内容 fo = open(file_name, "r") # 循环读取文件中的每一行内容 while 1: # 读取一行内容 line = fo.readline() # 如果读取到空行(文件结束),则跳出循环 if not line: break # 去除行首尾的空白字符 line_list = line.strip() # 将字符串按逗号分割,并转换为浮点数列表,得到当前行的关节位置数据 line_list = list(map(float, line_list.split(','))) # 判断是否是第一组关节位置(reference_pos尚未赋值) if reference_pos is None: # 将第一组关节位置赋值给reference_pos,作为参考位置 reference_pos = line_list # A组关节位置赋值给参考位置 # 调用逆解函数计算新的关节位置:发送"inverseKinematic"命令,参数为目标位姿和参考位置 suc, angle, id = sendCMD(sock, "inverseKinematic", {"targetPose": target_pose, "referencePos": reference_pos}) # 判断逆解是否成功且返回了有效的关节角度 if suc and angle is not None: # 将逆解得到的关节位置保存到new_joint_pos new_joint_pos = angle # 保存逆解得到的新关节位置 else: # 如果逆解失败,打印提示信息 print("逆解计算失败,使用原有关节位置") # 使用原始关节位置作为新关节位置 new_joint_pos = line_list # 失败时使用原位置 # 确定当前行的修改后关节位置:优先使用逆解得到的新位置,若不存在则使用原始位置 modified_list = new_joint_pos if new_joint_pos is not None else line_list # 打印当前行的修改后关节位置 #print(modified_list) # 判断是否是第一行关节位置(i=0表示首次处理) if (i == 0): # 发送关节运动命令,将机器人移动到起始点,速度设置为100 suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": 100}) # 打印运动命令的执行结果 #print(result) # 等待机器人停止运动 wait_stop(sock) # 等待机器人停止 # 打印提示信息,标识已完成起始点运动 #print(1) # 初始化透传服务:设置前瞻距离400、时间5、平滑度1、禁用响应 suc, result, id = sendCMD(sock, "transparent_transmission_init", { "lookahead": 400, "t": 5, "smoothness": 1, "response_enable": 0}) # 打印透传初始化的结果 #print(result) # 发送当前关节点信息到透传缓存,用于后续的伺服控制 send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPos": modified_list}) # 打印上一条命令的执行结果(注:此处result可能还是透传初始化的结果,需注意是否需要更新) #print(result) # 暂停0.01秒,控制数据发送频率 time.sleep(0.01) # 循环计数器加1,记录已处理的行数 i = i + 1 # 关闭关节位置文件 fo.close() end_time = time.time() print(f"程序执行时间:{end_time - start_time:.6f} 秒") # 打印提示信息,标识第三段程序执行完毕 print("第十一段程序执行完") # 第十二段程序封装为函数 def program12(sock): start_time = time.time() # 返回当前时间的时间戳(秒) i = 0 #turn_off_relay() #print("关闭喷枪")#关闭喷枪 ###大臂车补偿量 # 定义大臂车的位姿补偿量,前3个值为空间位置补偿,后3个为姿态补偿(此处姿态不补偿,均为0) increments = [increments_x, increments_y, increments_z, 0, 0, 0] # 初始化参考关节位置变量,用于存储第一组关节位置 reference_pos = None # 初始化新关节位置变量,用于存储逆解计算得到的关节位置 new_joint_pos = None # 打开Pose_1.txt文件,以只读模式获取第一组末端位姿并计算目标位姿 pose_file = open('/home/raspberrypi/robot1/Pose_1.txt', "r") # 读取文件第一行内容,并去除首尾的空白字符(包括换行符) first_pose_line = pose_file.readline().strip() # 读取第一组位姿 # 判断是否成功读取到第一行位姿数据 if first_pose_line: # 将读取到的字符串按逗号分割,并转换为浮点数列表,得到原始路径的第一个位置 Path_First_Position = list(map(float, first_pose_line.split(','))) # 将原始位置与补偿量逐元素相加,得到修正后的第一个位置 Modified_First_Position = [val + inc for val, inc in zip(Path_First_Position, increments)] # 将修正后的位置赋值给target_pose,作为后续逆解计算的目标位姿 target_pose = Modified_First_Position else: print("Pose_1.txt为空,无法获取初始位姿") pose_file.close() return pose_file.close() file_name = '/home/raspberrypi/robot1/joint_positions_assembled_1.txt' # 打开关节位置文件,以只读模式读取内容 fo = open(file_name, "r") # 循环读取文件中的每一行内容 while 1: # 读取一行内容 line = fo.readline() # 如果读取到空行(文件结束),则跳出循环 if not line: break # 去除行首尾的空白字符 line_list = line.strip() # 将字符串按逗号分割,并转换为浮点数列表,得到当前行的关节位置数据 line_list = list(map(float, line_list.split(','))) # 判断是否是第一组关节位置(reference_pos尚未赋值) if reference_pos is None: # 将第一组关节位置赋值给reference_pos,作为参考位置 reference_pos = line_list # 调用逆解函数计算新的关节位置:发送"inverseKinematic"命令,参数为目标位姿和参考位置 suc, angle, id = sendCMD(sock, "inverseKinematic", {"targetPose": target_pose, "referencePos": reference_pos}) # 判断逆解是否成功且返回了有效的关节角度 if suc and angle is not None: # 将逆解得到的关节位置保存到new_joint_pos new_joint_pos = angle # 保存逆解得到的新关节位置 else: # 如果逆解失败,打印提示信息 print("逆解计算失败,使用原有关节位置") # 使用原始关节位置作为新关节位置 new_joint_pos = line_list # 失败时使用原位置 # 确定当前行的修改后关节位置:优先使用逆解得到的新位置,若不存在则使用原始位置 modified_list = new_joint_pos if new_joint_pos is not None else line_list # 打印当前行的修改后关节位置 #print(modified_list) # 判断是否是第一行关节位置(i=0表示首次处理) if (i == 0): # 发送关节运动命令,将机器人移动到起始点,速度设置为100 suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": 100}) # 打印运动命令的执行结果 #print(result) # 等待机器人停止运动 wait_stop(sock) # 初始化透传服务:设置前瞻距离400、时间5、平滑度1、禁用响应 suc, result, id = sendCMD(sock, "transparent_transmission_init", { "lookahead": 400, "t": sample_time_test, "smoothness": 1, "response_enable": 0}) #print(result) # 发送当前关节点信息到透传缓存,用于后续的伺服控制 send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPos": modified_list}) #print(result) # 暂停0.01秒,控制数据发送频率 time.sleep(sleep_time_test) # 循环计数器加1,记录已处理的行数 i = i + 1 fo.close() end_time = time.time() print(f"程序执行时间:{end_time - start_time:.6f} 秒") print("第十二段程序执行完") # 第十三段程序封装为函数 def program13(sock): start_time = time.time() # 返回当前时间的时间戳(秒) i = 0 # 清空透传缓存 suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf") time.sleep(0) print("第二段程序开始执行") file_name = '/home/raspberrypi/robot1/Pose_1.txt' fo = open(file_name, "r") while 1: line = fo.readline() if not line: break line_list = line.strip() line_list = list(map(float, line_list.split(','))) increments = [increments_x, increments_y, increments_z, 0, 0, 0] modified_list = [val + inc for val, inc in zip(line_list, increments)] #print(modified_list) if (i == 0): # 关节运动到起始点 suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": 100}) #print(result) wait_stop(sock) #print(1) # 初始化透传服务 suc, result, id = sendCMD(sock, "transparent_transmission_init", { "lookahead": 100, "t": sample_time_test, "smoothness": 1, "response_enable": 0}) #print(result) # 添加透传伺服目标关节点信息到缓存中 send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": modified_list}) #print(result) time.sleep(sleep_time_test) #if(i >= turn_on_relay_start and i <= turn_on_relay_end): #turn_on_relay() #print("打开喷枪")#开喷枪 #if(i >= turn_off_relay_start and i <= turn_off_relay_end): #turn_off_relay() #print("关闭喷枪")#关喷枪 i = i + 1 fo.close() end_time = time.time() print(f"程序执行时间:{end_time - start_time:.6f} 秒") print("第十三段程序执行完") # 第十四段程序封装为函数,hjb改 def program14(sock): # 记录开始时间 start_time = time.time() i = 0 #turn_off_relay() #print("关闭喷枪") increments = [increments_x, increments_y, increments_z, 0, 0, 0] reference_pos = None new_joint_pos = None pose_file = open('/home/raspberrypi/robot1/Pose_2.txt', "r") first_pose_line = pose_file.readline().strip() if first_pose_line: Path_First_Position = list(map(float, first_pose_line.split(','))) Modified_First_Position = [val + inc for val, inc in zip(Path_First_Position, increments)] target_pose = Modified_First_Position else: print("Pose_2.txt为空,无法获取初始位姿") pose_file.close() return pose_file.close() file_name = '/home/raspberrypi/robot1/joint_positions_assembled_2.txt' fo = open(file_name, "r") while 1: line = fo.readline() if not line: break line_list = line.strip() line_list = list(map(float, line_list.split(','))) if reference_pos is None: reference_pos = line_list suc, angle, id = sendCMD(sock, "inverseKinematic", {"targetPose": target_pose, "referencePos": reference_pos}) if suc and angle is not None: new_joint_pos = angle else: print("逆解计算失败,使用原有关节位置") new_joint_pos = line_list modified_list = new_joint_pos if new_joint_pos is not None else line_list #print(modified_list) if (i == 0): suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": move_speed_test}) time.sleep(1) suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": move_speed_test}) time.sleep(1) suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": move_speed_test}) #print(result) wait_stop(sock) suc, result, id = sendCMD(sock, "transparent_transmission_init", { "lookahead": 100, "t": sample_time_test, "smoothness": 1, "response_enable": 0}) #print(result) send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPos": modified_list}) #print(result) time.sleep(sleep_time_test) i = i + 1 fo.close() end_time = time.time() print(f"程序执行时间:{end_time - start_time:.6f} 秒") print("第十四段程序执行完") # 第十五段程序封装为函数 def program15(sock): start_time = time.time() # 返回当前时间的时间戳(秒) i = 0 # 清空透传缓存 suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf") time.sleep(0) print("第四段程序开始执行") file_name = '/home/raspberrypi/robot1/Pose_2.txt' fo = open(file_name, "r") while 1: line = fo.readline() if not line: break line_list = line.strip() line_list = list(map(float, line_list.split(','))) increments = [increments_x, increments_y, increments_z, 0, 0, 0] modified_list = [val + inc for val, inc in zip(line_list, increments)] #print(modified_list) if (i == 0): # 关节运动到起始点 suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": 100}) #print(result) wait_stop(sock) # 等待机器人停止 #print(1) # 初始化透传服务 suc, result, id = sendCMD(sock, "transparent_transmission_init", { "lookahead": 100, "t": sample_time_test, "smoothness": 1, "response_enable": 0}) #print(result) # 添加透传伺服目标关节点信息到缓存中 send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": modified_list}) #print(result) time.sleep(sleep_time_test) #if(i >= turn_on_relay_start and i <= turn_on_relay_end): #turn_on_relay() #print("打开喷枪")#开喷枪 #if(i >= turn_off_relay_start and i <= turn_off_relay_end): #turn_off_relay() #print("关闭喷枪")#关喷枪 i = i + 1 fo.close() end_time = time.time() print(f"程序执行时间:{end_time - start_time:.6f} 秒") print("第十五段程序执行完") if __name__ == "__main__": init_gpio() ser = serial_init() # 严格检查串口初始化结果 if not ser or not isinstance(ser, serial.Serial) or not ser.is_open: print("串口初始化失败,无法继续运行") # 确保在函数内部返回 exit() time.sleep(20) turn_off_relay()#默认关闭喷枪 #time.sleep(20) # 机器人IP地址 robot_ip = "192.168.1.100" result = connectETController(robot_ip) sock = None if len(result) == 2: conSuc, sock = result if conSuc: print("成功连接到机器人") ##########################################################################################################################新添加 # 1.上电设置 ret, result, id = sendCMD(sock, "set_robot_power_status", {"status": 1}) print("开始上电") time.sleep(20) print("开始检测上电状态") ret, result, id = sendCMD(sock, "get_robot_power_status") if result == 0: ret, result, id = sendCMD(sock, "set_robot_power_status", {"status": 1}) time.sleep(20) else : print("已上电") # 2. 清除报警 print("清除报警") ret, result, id = sendCMD(sock,"clearAlarm") print("清除报警返回值") print(result) time.sleep(5) print("报警清除完成") # 获取同步状态 suc, result , id = sendCMD(sock, "getMotorStatus") if result == 0: # 同步伺服编码器数据 suc,result,id = sendCMD(sock, "syncMotorStatus") if result: print("同步伺服编码器数据成功") else: print("同步伺服编码器数据失败") time. sleep (2) # 3. 编码器零位校准 suc, result, id = sendCMD(sock, "getServoStatus") print("获取伺服状态返回值") print(result) time.sleep(2) if result == 0: # 设置机械臂伺服状态为“开”(status=1) suc, result, id = sendCMD(sock, "set_servo_status", {"status": 1}) print("设置机械臂伺服状态返回值") print(result) time.sleep(5) suc, result, id = sendCMD(sock, "calibrate_encoder_zero_position") print("编码器校准返回值") print(result) time.sleep(5) if result == 1: print("编码器校准成功") else: print("编码器校准失败") ################################################################################################################################ # 初始化透传服务 suc, result, id = sendCMD(sock, "get_transparent_transmission_state") if id == 1: print(suc, result, id) if result == 1: suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf") print("已清空透传缓存") time.sleep(0.5) else: print("无缓存") # 在此处做检测等相关操作 suc, result, id = sendCMD(sock, "getRobotState") if (result == 4): # 清除报警 suc, result, id = sendCMD(sock, "clearAlarm") time.sleep(0.5) # 获取同步状态 suc, result, id = sendCMD(sock, "getMotorStatus") if (result == 0): # 同步伺服编码器数据 suc, result, id = sendCMD(sock, "syncMotorStatus") time.sleep(0.5) else: print("同步状态获取失败") # 获取机械臂伺服状态 suc, result, id = sendCMD(sock, "getServoStatus") if (result == 0): # 设置机械臂伺服状态ON suc, result, id = sendCMD(sock, "set_servo_status", {"status": 1}) time.sleep(1) else: print("机械臂伺服取失败") # 获取当前机器人是否处于透传状态 suc, result, id = sendCMD(sock, "get_transparent_transmission_state") print(result) if (result == 1): # 清空透传缓存 suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf") time.sleep(0.5) else: print("非缓存状态") time.sleep(0.01) print("初始化完成,等待指令...") # 用字典模拟switch-case逻辑 program_switch = { 1: program1, 2: program2, 3: program3, # 保留第三段程序的选择入口 4: program4, 5: program5, 6: program6, 7: program7, 8: program8, 9: program9,#停机 10:program10,#洗枪位置1 11:program11,#洗枪位置2 12:program12,#以下为试运行程序 13:program13, 14:program14, 15:program15, } # 需要在执行前接收额外数据的程序 #need_extra_data_programs = {1, 3, 5, 7, 12, 14} need_extra_data_programs = {1, 3, 5, 7} # 关键修改:启动串口接收线程(daemon=True:主程序退出时线程自动终止) receive_thread = threading.Thread(target=serial_receive_data, args=(ser,), daemon=True) receive_thread.start() print("串口接收线程已启动") # 定义四种循环序列 sequence_1 = [2, 3, 4, 1] # 情况1的循环序列 sequence_2 = [4, 1, 2, 3] # 情况2的循环序列 sequence_3 = [6, 7, 8, 5] # 情况3的循环序列 sequence_4 = [8, 5, 6, 7] # 情况4的循环序列 sequence_5 = [13, 14, 15, 12] # 情况4的循环序列 #坐标位置补偿 increments_x = 0 increments_y = 0 increments_z = 0 delay_time_1357 = 5 #在衔接处轨迹延时时间 delay_time_2468 = 0.5 #在上升、下降轨迹延时时间 delay_time_10_13579_30 = 5 # 外层主循环 while True: try: # 等待主指令(从共享缓冲区读取) print("\n等待串口指令...") main_cmd_bytes = None while main_cmd_bytes is None: main_cmd_bytes = read_cmd_from_shared(timeout=1.0) time.sleep(0.01) # 解析主指令(帧头后第一字节为指令值) cmd = main_cmd_bytes[1] print(f"收到串口指令: 0x{cmd:02X}") # 情况1: 接收到1 if cmd == 1: # 坐标位置补偿 increments_x = 0 increments_y = 0 increments_z = 0 print("触发情况1: 执行program1") program_switch[1](sock) time.sleep(delay_time_10_13579_30) ser.write(b'\x80\x80') ser.flush() print("执行program1结束,已发送响应0x80 0x80") # 等待子指令接收 sub_cmd_bytes = None while sub_cmd_bytes is None: sub_cmd_bytes = read_cmd_from_shared(timeout=1.0) time.sleep(0.01) sub_cmd = sub_cmd_bytes[0] # 应为0xAA sub_cmd1 = sub_cmd_bytes[1] # 子指令值(如0x20、0x0a) print(f"收到子指令: 0x{sub_cmd:02X} 0x{sub_cmd1:02X}") if sub_cmd == 0xAA: if sub_cmd1 == 0x20: print("开始情况1循环序列: program2->3->4->1...") current_index = 0 running = True while running: # 执行当前序列中的程序 current_program = sequence_1[current_index] print(f"执行序列程序: {current_program}") # 优化:先检查缓冲区是否有数据,再决定是否解析(非阻塞) if current_program in need_extra_data_programs: print(f"program{current_program}执行前,尝试读取补偿数据...") # 先检查缓冲区是否有足够数据,无则直接跳过解析 with serial_shared.lock: has_data = len(serial_shared.buffer) >= 7 if has_data: success, x_coord, y_coord, z_coord = parse_frame_threaded() if success: increments_x, increments_y, increments_z = transform_data(x_coord,y_coord, z_coord) else: print("缓冲区无补偿数据,使用原有补偿值") program_switch[current_program](sock) print(current_program) # 读取可能的停止指令(超时0.1秒,不阻塞循环) stop_cmd_bytes = read_cmd_from_shared(timeout=0.1) if stop_cmd_bytes is not None and stop_cmd_bytes == b'\xAA\x30': print("收到0xAA 0x30,停止循环") ser.write(b'\x80\x80') ser.flush() running = False time.sleep(0.5) # 检查是否需要延时 if current_program == 2: print("program2执行完成,延时0.5秒...") time.sleep(delay_time_2468) elif current_program == 3: print("program3执行完成,延时5秒...") time.sleep(delay_time_1357) elif current_program == 4: print("program4执行完成,延时0.5秒...") time.sleep(delay_time_2468) elif current_program == 1: print("program1执行完成,延时5秒...") time.sleep(delay_time_1357) else: print(f"当前执行程序是: {current_program}") # 移动到下一个程序索引 current_index = (current_index + 1) % len(sequence_1) # 注意这里要换成当前使用的 sequence time.sleep(0.1) # 回到洗枪位置指令 elif sub_cmd1 == 0x0a: print(f"接收到指令0xAA 0x{sub_cmd1:02X},切换执行对应程序") program_switch[10](sock) time.sleep(delay_time_10_13579_30) ser.write(b'\x80\x80') ser.flush() print("执行program10结束,已发送响应0x80 0x80") print("返回最外层等待串口数据") else: print(f"收到未知子指令: 0x{sub_cmd:02X},继续等待") # 情况2: 接收到3 elif cmd == 3: # 坐标位置补偿 increments_x = 0 increments_y = 0 increments_z = 0 print("触发情况2: 执行program3") program_switch[3](sock) time.sleep(delay_time_10_13579_30) ser.write(b'\x80\x80') ser.flush() print("执行program3结束,已发送响应0x80 0x80") # 子指令接收(调用read_cmd_from_shared) print("等待0x20开始循环或其他指令...") sub_cmd_bytes = None while sub_cmd_bytes is None: sub_cmd_bytes = read_cmd_from_shared(timeout=1.0) time.sleep(0.01) sub_cmd = sub_cmd_bytes[0] sub_cmd1 = sub_cmd_bytes[1] print(f"收到子指令: 0x{sub_cmd:02X}") if sub_cmd == 0xAA: if sub_cmd1 == 0x20: print("开始情况2循环序列: program4->1->2->3...") current_index = 0 running = True while running: current_program = sequence_2[current_index] print(f"执行序列程序: {current_program}") # 优化:先检查缓冲区是否有数据,再决定是否解析(非阻塞) if current_program in need_extra_data_programs: print(f"program{current_program}执行前,尝试读取补偿数据...") # 先检查缓冲区是否有足够数据,无则直接跳过解析 with serial_shared.lock: has_data = len(serial_shared.buffer) >= 7 if has_data: success, x_coord, y_coord, z_coord = parse_frame_threaded() if success: increments_x, increments_y, increments_z = transform_data(x_coord,y_coord, z_coord) else: print("缓冲区无补偿数据,使用原有补偿值") program_switch[current_program](sock) # 读取可能的停止指令(超时0.1秒,不阻塞循环) stop_cmd_bytes = read_cmd_from_shared(timeout=0.1) if stop_cmd_bytes is not None and stop_cmd_bytes == b'\xAA\x30': print("收到0xAA 0x30,停止循环") ser.write(b'\x80\x80') ser.flush() running = False time.sleep(0.5) # 检查是否需要延时 if current_program == 4: print("program4执行完成,延时0.5秒...") time.sleep(delay_time_2468) elif current_program == 1: print("program1执行完成,延时5秒...") time.sleep(delay_time_1357) elif current_program == 2: print("program2执行完成,延时0.5秒...") time.sleep(delay_time_2468) elif current_program == 3: print("program3执行完成,延时5秒...") time.sleep(delay_time_1357) # 移动到下一个程序索引 current_index = (current_index + 1) % len(sequence_1) # 注意这里要换成当前使用的 sequence time.sleep(0.1) #回到洗枪位置 elif sub_cmd1 == 0x0a: print(f"接收到指令0xAA 0x{sub_cmd:02X},切换执行对应程序") program_switch[10](sock) time.sleep(delay_time_10_13579_30) ser.write(b'\x80\x80') ser.flush() print("执行program10结束,已发送响应0x80 0x80") print("返回最外层等待串口数据") else: print(f"收到未知子指令: 0x{sub_cmd1:02X},继续等待") # 情况3: 接收到5 elif cmd == 5: # 坐标位置补偿 increments_x = 0 increments_y = 0 increments_z = 0 print("触发情况3: 执行program5") program_switch[5](sock) time.sleep(delay_time_10_13579_30) ser.write(b'\x80\x80') ser.flush() print("执行program5结束,已发送响应0x80 0x80") # 子指令接收(调用read_cmd_from_shared) print("等待0x20开始循环或其他指令...") sub_cmd_bytes = None while sub_cmd_bytes is None: sub_cmd_bytes = read_cmd_from_shared(timeout=1.0) time.sleep(0.01) sub_cmd = sub_cmd_bytes[0] sub_cmd1 = sub_cmd_bytes[1] print(f"收到子指令: 0x{sub_cmd:02X}") if sub_cmd == 0xAA: if sub_cmd1 == 0x20: print("开始情况3循环序列: program6->7->8->5...") current_index = 0 running = True while running: current_program = sequence_3[current_index] print(f"执行序列程序: {current_program}") # 优化:先检查缓冲区是否有数据,再决定是否解析(非阻塞) if current_program in need_extra_data_programs: print(f"program{current_program}执行前,尝试读取补偿数据...") # 先检查缓冲区是否有足够数据,无则直接跳过解析 with serial_shared.lock: has_data = len(serial_shared.buffer) >= 7 if has_data: success, x_coord, y_coord, z_coord = parse_frame_threaded() if success: increments_x, increments_y, increments_z = transform_data(x_coord,y_coord, z_coord) else: print("缓冲区无补偿数据,使用原有补偿值") program_switch[current_program](sock) # 读取可能的停止指令(超时0.1秒,不阻塞循环) stop_cmd_bytes = read_cmd_from_shared(timeout=0.1) if stop_cmd_bytes is not None and stop_cmd_bytes == b'\xAA\x30': print("收到0xAA 0x30,停止循环") ser.write(b'\x80\x80') ser.flush() running = False time.sleep(0.5) # 检查是否需要延时 if current_program == 6: print("program6执行完成,延时0.5秒...") time.sleep(delay_time_2468) elif current_program == 7: print("program7执行完成,延时5秒...") time.sleep(delay_time_1357) elif current_program == 8: print("program8执行完成,延时0.5秒...") time.sleep(delay_time_2468) elif current_program == 5: print("program5执行完成,延时5秒...") time.sleep(delay_time_1357) # 移动到下一个程序索引 current_index = (current_index + 1) % len(sequence_1) # 注意这里要换成当前使用的 sequence time.sleep(0.1) #回到洗枪位置指令 elif sub_cmd1 == 0x0a: print(f"接收到指令0xAA 0x{sub_cmd1:02X},切换执行对应程序") program_switch[10](sock) time.sleep(delay_time_10_13579_30) ser.write(b'\x80\x80') ser.flush() print("执行program10结束,已发送响应0x80 0x80") print("返回最外层等待串口数据") else: print(f"收到未知子指令: 0x{sub_cmd:02X},继续等待") # 情况4: 接收到7 elif cmd == 7: # 坐标位置补偿 increments_x = 0 increments_y = 0 increments_z = 0 print("触发情况4: 执行program7") program_switch[7](sock) time.sleep(delay_time_10_13579_30) ser.write(b'\x80\x80') ser.flush() print("执行program7结束,已发送响应0x80 0x80") # 子指令接收(调用read_cmd_from_shared) print("等待0x20开始循环或其他指令...") sub_cmd_bytes = None while sub_cmd_bytes is None: sub_cmd_bytes = read_cmd_from_shared(timeout=1.0) time.sleep(0.01) sub_cmd = sub_cmd_bytes[0] sub_cmd1 = sub_cmd_bytes[1] print(f"收到子指令: 0x{sub_cmd:02X}") if sub_cmd == 0xAA: if sub_cmd1 == 0x20: print("开始情况4循环序列: program8->5->6->7...") current_index = 0 running = True while running: current_program = sequence_4[current_index] print(f"执行序列程序: {current_program}") # 优化:先检查缓冲区是否有数据,再决定是否解析(非阻塞) if current_program in need_extra_data_programs: print(f"program{current_program}执行前,尝试读取补偿数据...") # 先检查缓冲区是否有足够数据,无则直接跳过解析 with serial_shared.lock: has_data = len(serial_shared.buffer) >= 7 if has_data: success, x_coord, y_coord, z_coord = parse_frame_threaded() if success: increments_x, increments_y, increments_z = transform_data(x_coord,y_coord, z_coord) else: print("缓冲区无补偿数据,使用原有补偿值") program_switch[current_program](sock) # 读取可能的停止指令(超时0.1秒,不阻塞循环) stop_cmd_bytes = read_cmd_from_shared(timeout=0.1) if stop_cmd_bytes is not None and stop_cmd_bytes == b'\xAA\x30': print("收到0xAA 0x30,停止循环") ser.write(b'\x80\x80') ser.flush() running = False time.sleep(0.5) # 检查是否需要延时 if current_program == 8: print("program8执行完成,延时0.5秒...") time.sleep(delay_time_2468) elif current_program == 5: print("program5执行完成,延时5秒...") time.sleep(delay_time_1357) elif current_program == 6: print("program6执行完成,延时0.5秒...") time.sleep(delay_time_2468) elif current_program == 7: print("program7执行完成,延时5秒...") time.sleep(delay_time_1357) # 移动到下一个程序索引 current_index = (current_index + 1) % len(sequence_1) # 注意这里要换成当前使用的 sequence time.sleep(0.1) #回到洗枪位置指令 elif sub_cmd1 == 0x0a: print(f"接收到指令0xAA 0x{sub_cmd1:02X},切换执行对应程序") program_switch[10](sock) time.sleep(delay_time_10_13579_30) ser.write(b'\x80\x80') ser.flush() print("执行program10结束,已发送响应0x80 0x80") print("返回最外层等待串口数据") else: print(f"收到未知子指令: 0x{sub_cmd:02X},继续等待") # 试运行程序段,与情况1相同,只不过速度较慢 elif cmd == 12: # 坐标位置补偿 increments_x = 0 increments_y = 0 increments_z = 0 print("触发情况12: 执行program12") program_switch[12](sock) time.sleep(delay_time_10_13579_30) ser.write(b'\x80\x80') ser.flush() print("执行program12结束,已发送响应0x80 0x80") # 子指令接收(调用read_cmd_from_shared) print("等待0x20开始循环或其他指令...") sub_cmd_bytes = None while sub_cmd_bytes is None: sub_cmd_bytes = read_cmd_from_shared(timeout=1.0) time.sleep(0.01) sub_cmd = sub_cmd_bytes[0] sub_cmd1 = sub_cmd_bytes[1] print(f"收到子指令: 0x{sub_cmd:02X}") if sub_cmd == 0xAA and sub_cmd1 == 0x20: print("开始情况1循环序列: program13->14->15->12...") current_index = 0 running = True while running: # 执行当前序列中的程序 current_program = sequence_5[current_index] print(f"执行序列程序: {current_program}") program_switch[current_program](sock) # 读取可能的停止指令(超时0.1秒,不阻塞循环) stop_cmd_bytes = read_cmd_from_shared(timeout=0.1) if stop_cmd_bytes is not None and stop_cmd_bytes == b'\xAA\x30': print("收到0xAA 0x30,停止循环") ser.write(b'\x80\x80') ser.flush() running = False time.sleep(0.5) # 检查是否需要延时 if current_program == 13: print("program13执行完成,延时0.5秒...") time.sleep(delay_time_2468) elif current_program == 14: print("program14执行完成,延时5秒...") time.sleep(delay_time_1357) elif current_program == 15: print("program15执行完成,延时0.5秒...") time.sleep(delay_time_2468) elif current_program == 12: print("program12执行完成,延时5秒...") time.sleep(delay_time_1357) # 移动到下一个程序索引 current_index = (current_index + 1) % len(sequence_1) # 注意这里要换成当前使用的 sequence time.sleep(0.1) # 情况5: 接收到9 elif cmd == 9: print("触发情况5: 执行program9") program_switch[9](sock) time.sleep(delay_time_10_13579_30) ser.write(b'\x80\x80') ser.flush() print("执行program9结束,已发送响应0x80 0x80") print("返回最外层等待串口数据") # 情况6: 接收到10 elif cmd == 10: print("触发情况6: 执行program10") program_switch[10](sock) time.sleep(delay_time_10_13579_30) ser.write(b'\x80\x80') ser.flush() print("执行program10结束,已发送响应0x80 0x80") print("返回最外层等待串口数据") # 情况7: 接收到11 elif cmd == 11: print("触发情况7: 执行program11") program_switch[11](sock) time.sleep(delay_time_10_13579_30) ser.write(b'\x80\x80') ser.flush() print("执行program10结束,已发送响应0x80 0x80") print("返回最外层等待串口数据") # 退出指令 elif cmd == 0x00: print("收到退出指令,程序终止") break elif cmd == 16: #开喷枪 turn_on_relay() print("打开喷枪") elif cmd == 17: #关喷枪 turn_off_relay() print("关闭喷枪") # 未知指令 else: print(f"未知指令: 0x{cmd:02X},等待下一条指令...") time.sleep(0.01) except KeyboardInterrupt: print("程序被手动中断") break except Exception as e: print(f"主循环错误: {e}") time.sleep(0.5) else: print("连接机器人失败") else: print("无法连接到机器人") # 资源清理 if sock: sock.close() if ser and isinstance(ser, serial.Serial) and ser.is_open: ser.close() print("串口已关闭") GPIO.cleanup()