import socket import json import numpy as np import math import RPi.GPIO as GPIO import time import serial # 保留对serial模块的引用 from serial import Serial # 同时导入Serial类 import struct import threading # 用于创建串口接收线程 from queue import Queue # 新增:导入队列模块 # 新增:全局坐标补偿变量 global increments_x, increments_y, increments_z increments_x = 0 increments_y = 0 increments_z = 0 # 新增:补偿值更新队列(解耦线程通信,避免锁竞争) compensate_queue = Queue(maxsize=1) # 最多缓存1个最新值,确保数据时效性 # 坐标补偿量,仅更新于program1、program3、program5、program7 global Coordinate_compensation Coordinate_compensation = [0, 0, 0, 0, 0, 0] # GPIO 代码 # 定义继电器引脚 EN_Relay = 17 #预期的帧长度,可根据实际情况修改 EXPECTED_LENGTH = 7 #采样时间、延时时间和衔接机器人速度 sample_time = 20 #50 sleep_time = 0.02 #0.008 #0.005 move_speed = 100 lookahead_time = 200 #前瞻时间 #试运行速度 sample_time_test = 100 #3 sleep_time_test = 0.05 #0.003 move_speed_test = 20 #喷枪开启和关闭位置 turn_on_relay_start = 20 turn_on_relay_end = 25 turn_off_relay_start = 135 turn_off_relay_end = 140 # 定义转换矩阵R_T R_T = np.array([ [-0.7206, -0.6908, 0], [0.6884, -0.7231, 0], [0.0830, 0, 1] ]) def init_gpio(): """初始化GPIO设置""" GPIO.setmode(GPIO.BCM) GPIO.setup(EN_Relay, GPIO.OUT) # 初始化为高电平(关闭状态) GPIO.output(EN_Relay, GPIO.LOW) time.sleep(1) # 等待稳定 def turn_on_relay(): """打开继电器(设置为LOW)""" try: GPIO.output(EN_Relay, GPIO.HIGH) #print('Relay turned on (LOW)') except Exception as e: print(f"Error turning on relay: {e}") def turn_off_relay(): """关闭继电器(设置为HIGH)""" try: GPIO.output(EN_Relay, GPIO.LOW) #print('Relay turned off (HIGH)') except Exception as e: print(f"Error turning off relay: {e}") # 初始化串口 def serial_init(): try: ser = serial.Serial( port="/dev/ttyS0", # 笔记本Windows端口,Linux/Mac替换为"/dev/ttyUSB0" baudrate=115200, timeout=0.1 ) print("串口初始化成功") return ser except Exception as e: print(f"笔记本串口初始化失败: {e}") return None # 矩阵转换函数:将P_L1_O2通过R_T转换为P_L1_O1 def transform_data(x_coord, y_coord, z_coord): # 将接收到的3个坐标组成列矩阵P_L1_O2 P_L1_O2 = np.array([[x_coord], [y_coord], [z_coord]], dtype=np.float64) # 执行矩阵乘法:P_L1_O1 = R_T * P_L1_O2 P_L1_O1 = np.dot(R_T, P_L1_O2) # 提取转换后的三个数据 increments_x = P_L1_O1[0][0] increments_y = P_L1_O1[1][0] increments_z = P_L1_O1[2][0] #print(f"矩阵转换完成 - increments_x: {increments_x:.4f}, increments_y: {increments_y:.4f}, increments_z: {increments_z:.4f}") return increments_x, increments_y, increments_z class SerialSharedData: """线程安全的串口数据共享存储类""" def __init__(self): self.lock = threading.Lock() # 防止多线程数据竞争 self.buffer = b'' # 用于解析0xFF坐标帧的通用缓冲区 self.cmd_buffer = b'' # 用于解析0xAA指令帧(主指令/子指令/停止指令)的缓冲区 # 初始化全局共享数据实例 serial_shared = SerialSharedData() def serial_receive_data(ser): """线程内持续接收串口数据,同步解析0xFF坐标帧并更新全局补偿变量""" global increments_x, increments_y, increments_z frame_header = 0xFF frame_length = 7 # 0xFF + x(2) + y(2) + z(2) while True: try: if not ser or not isinstance(ser, serial.Serial) or not ser.is_open: time.sleep(0.1) continue if ser.in_waiting > 0: data = ser.read(ser.in_waiting) with serial_shared.lock: serial_shared.buffer += data serial_shared.cmd_buffer += data # 新增:在接收线程中解析0xFF坐标帧 with serial_shared.lock: current_buf = serial_shared.buffer[:] if len(current_buf) >= frame_length: header_index = current_buf.find(bytes([frame_header])) if header_index != -1 and header_index + frame_length <= len(current_buf): # 提取完整帧并更新缓冲区 frame = current_buf[header_index:header_index+frame_length] with serial_shared.lock: serial_shared.buffer = serial_shared.buffer[header_index+frame_length:] # 坐标解码 x_coord = (frame[1] << 8) | frame[2] x_coord = x_coord - 0x10000 if x_coord & 0x8000 else x_coord y_coord = (frame[3] << 8) | frame[4] y_coord = y_coord - 0x10000 if y_coord & 0x8000 else y_coord z_coord = (frame[5] << 8) | frame[6] z_coord = z_coord - 0x10000 if z_coord & 0x8000 else z_coord # 坐标系转换(直接调用原有函数) inc_x, inc_y, inc_z = transform_data(x_coord, y_coord, z_coord) # 存入队列(覆盖旧值,确保取到最新) if not compensate_queue.empty(): compensate_queue.get_nowait() compensate_queue.put((inc_x, inc_y, inc_z)) #print(f"接收线程解析更新 - x:{increments_x:.4f}, y:{increments_y:.4f}, z:{increments_z:.4f}") except Exception as e: print(f"串口接收线程错误: {str(e)}") time.sleep(0.005)# 关键修改:延长线程休眠时间,降低CPU占用 def read_cmd_from_shared(timeout=1.0): frame_header = 0xAA cmd_length = 2 # 指令帧结构:0xAA(帧头) + 1字节指令值 start_time = time.time() while (time.time() - start_time) < timeout: # 加锁读取指令缓冲区 with serial_shared.lock: current_cmd_buf = serial_shared.cmd_buffer[:] if len(current_cmd_buf) >= cmd_length: header_index = current_cmd_buf.find(bytes([frame_header])) if header_index != -1: # 提取完整指令帧 cmd_bytes = current_cmd_buf[header_index:header_index+cmd_length] # 更新共享缓冲区(移除已处理指令) with serial_shared.lock: serial_shared.cmd_buffer = serial_shared.cmd_buffer[header_index+cmd_length:] return cmd_bytes time.sleep(0.01) return None # 超时返回None # 连接机器人控制器 def connectETController(ip, port=8055): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: sock.connect((ip, port)) return (True, sock) except Exception as e: sock.close() return (False,) # 发送命令并接收响应 def sendCMD(sock, cmd, params=None, id=1): if not params: params = [] else: params = json.dumps(params) sendStr = '{"method":"%s","params":%s,"jsonrpc":"2.0","id":%d}' % (cmd, params, id) + "\n" try: sock.sendall(bytes(sendStr, "utf-8")) ret = sock.recv(1024) jdata = json.loads(str(ret, "utf-8")) if "result" in jdata.keys(): return (True, json.loads(jdata["result"]), jdata["id"]) elif "error" in jdata.keys(): return (False, jdata["error"], jdata["id"]) else: return (False, None, None) except Exception as e: return (False, None, None) def send_Point(sock, cmd, params=None, id=1): if (not params): params = [] else: params = json.dumps(params) sendStr = "{{\"method\":\"{0}\",\"params\":{1},\"jsonrpc\":\"2.0\",\"id\":{2}}}".format(cmd, params, id) + "\n" sock.sendall(bytes(sendStr, "utf-8")) def wait_stop(sock): # 修正:添加sock参数 while True: time.sleep(0.1) ret1, result1, id1 = sendCMD(sock, "getRobotState") # getRobotstate if (ret1): if result1 == 0 or result1 == 4: break else: #print("getRobotState failed") continue #需要做8端程序, # 第1段,起点处的关节位置(txt);第2段,上升喷涂(txt); # 第3段,起点初的关节位置(txt);第4段,上升喷涂(txt); # 第5段,起点初的关节位置(txt);第6段,上升喷涂(txt); # 第7段,起点初的关节位置(txt);第8段,上升喷涂(txt); # 第一段程序封装为函数 def program1(sock): start_time = time.time() # 返回当前时间的时间戳(秒) i = 0 suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf") turn_off_relay() global increments_x, increments_y, increments_z global Coordinate_compensation # 从队列获取最新补偿值(非阻塞,无新值则使用当前值) if not compensate_queue.empty(): increments_x, increments_y, increments_z = compensate_queue.get_nowait() increments = [increments_x, increments_y, increments_z, 0, 0, 0] Coordinate_compensation = increments reference_pos = None new_joint_pos = None pose_file = open('/home/raspberrypi/robot1/Pose_1.txt', "r") first_pose_line = pose_file.readline().strip() if first_pose_line: Path_First_Position = list(map(float, first_pose_line.split(','))) Modified_First_Position = [val + inc for val, inc in zip(Path_First_Position, Coordinate_compensation)] target_pose = Modified_First_Position else: #print("Pose_1.txt为空,无法获取初始位姿") pose_file.close() return pose_file.close() file_name = '/home/raspberrypi/robot1/joint_positions_assembled_1.txt' fo = open(file_name, "r") while 1: line = fo.readline() if not line: break line_list = line.strip() line_list = list(map(float, line_list.split(','))) if reference_pos is None: reference_pos = line_list suc, angle, id = sendCMD(sock, "inverseKinematic", {"targetPose": target_pose, "referencePos": reference_pos}) if suc and angle is not None: new_joint_pos = angle else: #print("逆解计算失败,使用原有关节位置") new_joint_pos = line_list modified_list = new_joint_pos if new_joint_pos is not None else line_list if (i == 0): while True: suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": 100}) if suc: break print("moveByJoint命令发送失败,重试...") time.sleep(0.1) wait_stop(sock) # 初始化透传服务 #suc, result, id = sendCMD(sock, "transparent_transmission_init", { #"lookahead": 10, "t": sample_time, "smoothness": 1, "response_enable": 0}) # 发送当前关节点信息到透传缓存,用于后续的伺服控制 #send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPos": modified_list}) time.sleep(sleep_time) i = i + 1 fo.close() suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf") end_time = time.time() print("第一段程序执行完") # 第二段程序封装为函数 def program2(sock): start_time = time.time() global increments_x, increments_y, increments_z global Coordinate_compensation i = 0 suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf") time.sleep(0) print("第二段程序开始执行") file_name = '/home/raspberrypi/robot1/Pose_1.txt' fo = open(file_name, "r") while 1: line = fo.readline() if not line: break line_list = line.strip() line_list = list(map(float, line_list.split(','))) increments = Coordinate_compensation modified_list = [val + inc for val, inc in zip(line_list, increments)] if (i == 0): suc, result, id = sendCMD(sock, "transparent_transmission_init", { "lookahead": lookahead_time, "t": sample_time, "smoothness": 1, "response_enable": 0}) send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": modified_list}) time.sleep(sleep_time) if(i >= turn_on_relay_start and i <= turn_on_relay_end): turn_on_relay() print("打开喷枪")#开喷枪 if(i >= turn_off_relay_start and i <= turn_off_relay_end): turn_off_relay() print("关闭喷枪")#关喷枪 i = i + 1 fo.close() suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf") end_time = time.time() print("第二段程序执行完") # 第三段程序封装为函数,hjb改 def program3(sock): start_time = time.time() i = 0 suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf") turn_off_relay() global increments_x, increments_y, increments_z global Coordinate_compensation # 从队列获取最新补偿值(非阻塞,无新值则使用当前值) if not compensate_queue.empty(): increments_x, increments_y, increments_z = compensate_queue.get_nowait() increments = [increments_x, increments_y, increments_z, 0, 0, 0] Coordinate_compensation = increments reference_pos = None new_joint_pos = None pose_file = open('/home/raspberrypi/robot1/Pose_2.txt', "r") first_pose_line = pose_file.readline().strip() if first_pose_line: Path_First_Position = list(map(float, first_pose_line.split(','))) Modified_First_Position = [val + inc for val, inc in zip(Path_First_Position, Coordinate_compensation)] target_pose = Modified_First_Position else: #print("Pose_2.txt为空,无法获取初始位姿") pose_file.close() return pose_file.close() file_name = '/home/raspberrypi/robot1/joint_positions_assembled_2.txt' fo = open(file_name, "r") while 1: line = fo.readline() if not line: break line_list = line.strip() line_list = list(map(float, line_list.split(','))) if reference_pos is None: reference_pos = line_list suc, angle, id = sendCMD(sock, "inverseKinematic", {"targetPose": target_pose, "referencePos": reference_pos}) if suc and angle is not None: new_joint_pos = angle else: #print("逆解计算失败,使用原有关节位置") new_joint_pos = line_list modified_list = new_joint_pos if new_joint_pos is not None else line_list #print(modified_list) if (i == 0): while True: suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": 100}) if suc: break print("moveByJoint命令发送失败,重试...") time.sleep(0.1) wait_stop(sock) #suc, result, id = sendCMD(sock, "transparent_transmission_init", { #"lookahead": 10, "t": sample_time, "smoothness": 1, "response_enable": 0}) #send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPos": modified_list}) time.sleep(sleep_time) i = i + 1 fo.close() suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf") end_time = time.time() print("第三段程序执行完") # 第四段程序封装为函数 def program4(sock): print("第四段程序开始执行") start_time = time.time() global increments_x, increments_y, increments_z global Coordinate_compensation i = 0 suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf") time.sleep(0) file_name = '/home/raspberrypi/robot1/Pose_2.txt' fo = open(file_name, "r") while 1: line = fo.readline() if not line: break line_list = line.strip() line_list = list(map(float, line_list.split(','))) increments = Coordinate_compensation modified_list = [val + inc for val, inc in zip(line_list, increments)] #print(modified_list) if (i == 0): suc, result, id = sendCMD(sock, "transparent_transmission_init", { "lookahead": lookahead_time, "t": sample_time, "smoothness": 1, "response_enable": 0}) # 添加透传伺服目标关节点信息到缓存中 send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": modified_list}) time.sleep(sleep_time) if(i >= turn_on_relay_start and i <= turn_on_relay_end): turn_on_relay() print("打开喷枪")#开喷枪 if(i >= turn_off_relay_start and i <= turn_off_relay_end): turn_off_relay() print("关闭喷枪")#关喷枪 i = i + 1 fo.close() suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf") end_time = time.time() #print(f"程序执行时间:{end_time - start_time:.6f} 秒") print("第四段程序执行完") # 第五段程序封装为函数,hjb改 def program5(sock): start_time = time.time() i = 0 suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf") turn_off_relay() global increments_x, increments_y, increments_z global Coordinate_compensation if not compensate_queue.empty(): increments_x, increments_y, increments_z = compensate_queue.get_nowait() increments = [increments_x, increments_y, increments_z, 0, 0, 0] Coordinate_compensation = increments reference_pos = None # 初始化参考位置 new_joint_pos = None # 初始化逆解得到的新关节位置 pose_file = open('/home/raspberrypi/robot1/Pose_3.txt', "r") first_pose_line = pose_file.readline().strip() if first_pose_line: Path_First_Position = list(map(float, first_pose_line.split(','))) Modified_First_Position = [val + inc for val, inc in zip(Path_First_Position, Coordinate_compensation)] target_pose = Modified_First_Position else: #print("Pose_3.txt为空,无法获取初始位姿") pose_file.close() return pose_file.close() file_name = '/home/raspberrypi/robot1/joint_positions_assembled_3.txt' fo = open(file_name, "r") while 1: line = fo.readline() if not line: break line_list = line.strip() line_list = list(map(float, line_list.split(','))) if reference_pos is None: reference_pos = line_list suc, angle, id = sendCMD(sock, "inverseKinematic", {"targetPose": target_pose, "referencePos": reference_pos}) if suc and angle is not None: new_joint_pos = angle else: #print("逆解计算失败,使用原有关节位置") new_joint_pos = line_list # 失败时使用原位置 modified_list = new_joint_pos if new_joint_pos is not None else line_list if (i == 0): while True: suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": 100}) if suc: break print("moveByJoint命令发送失败,重试...") time.sleep(0.1) wait_stop(sock) #suc, result, id = sendCMD(sock, "transparent_transmission_init", { #"lookahead": 10, "t": sample_time, "smoothness": 1, "response_enable": 0}) #send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPos": modified_list}) time.sleep(sleep_time) i = i + 1 fo.close() suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf") end_time = time.time() print("第五段程序执行完") # 第六段程序封装为函数 def program6(sock): print("第六段程序开始执行") start_time = time.time() # 返回当前时间的时间戳(秒) global increments_x, increments_y, increments_z global Coordinate_compensation i = 0 suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf") time.sleep(0) file_name = '/home/raspberrypi/robot1/Pose_3.txt' fo = open(file_name, "r") while 1: line = fo.readline() if not line: break line_list = line.strip() line_list = list(map(float, line_list.split(','))) increments = Coordinate_compensation modified_list = [val + inc for val, inc in zip(line_list, increments)] if (i == 0): suc, result, id = sendCMD(sock, "transparent_transmission_init", { "lookahead": lookahead_time, "t": sample_time, "smoothness": 1, "response_enable": 0}) # 添加透传伺服目标关节点信息到缓存中 send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": modified_list}) #print(result) time.sleep(sleep_time) if(i >= turn_on_relay_start and i <= turn_on_relay_end): turn_on_relay() #print("打开喷枪")#开喷枪 if(i >= turn_off_relay_start and i <= turn_off_relay_end): turn_off_relay() #print("关闭喷枪")#关喷枪 i = i + 1 fo.close() suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf") end_time = time.time() print("第六段程序执行完") # 第七段程序封装为函数,hjb改 def program7(sock): start_time = time.time() i = 0 suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf") turn_off_relay() global increments_x, increments_y, increments_z global Coordinate_compensation if not compensate_queue.empty(): increments_x, increments_y, increments_z = compensate_queue.get_nowait() increments = [increments_x, increments_y, increments_z, 0, 0, 0] Coordinate_compensation = increments reference_pos = None # 初始化参考位置 new_joint_pos = None # 初始化逆解得到的新关节位置 pose_file = open('/home/raspberrypi/robot1/Pose_4.txt', "r") first_pose_line = pose_file.readline().strip() if first_pose_line: Path_First_Position = list(map(float, first_pose_line.split(','))) Modified_First_Position = [val + inc for val, inc in zip(Path_First_Position, Coordinate_compensation)] target_pose = Modified_First_Position else: #print("Pose_4.txt为空,无法获取初始位姿") pose_file.close() return pose_file.close() file_name = '/home/raspberrypi/robot1/joint_positions_assembled_4.txt' fo = open(file_name, "r") while 1: line = fo.readline() if not line: break line_list = line.strip() line_list = list(map(float, line_list.split(','))) if reference_pos is None: reference_pos = line_list suc, angle, id = sendCMD(sock, "inverseKinematic", {"targetPose": target_pose, "referencePos": reference_pos}) if suc and angle is not None: new_joint_pos = angle else: #print("逆解计算失败,使用原有关节位置") new_joint_pos = line_list modified_list = new_joint_pos if new_joint_pos is not None else line_list if (i == 0): while True: suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": 100}) if suc: break print("moveByJoint命令发送失败,重试...") time.sleep(0.1) # 重试间隔,可根据需求调整 wait_stop(sock) #suc, result, id = sendCMD(sock, "transparent_transmission_init", { #"lookahead": 10, "t": sample_time, "smoothness": 1, "response_enable": 0}) #send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPos": modified_list}) time.sleep(sleep_time) i = i + 1 fo.close() suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf") end_time = time.time() print("第七段程序执行完") # 第八段程序封装为函数 def program8(sock): print("第八段程序开始执行") start_time = time.time() global increments_x, increments_y, increments_z global Coordinate_compensation i = 0 # 清空透传缓存 suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf") time.sleep(0) file_name = '/home/raspberrypi/robot1/Pose_4.txt' fo = open(file_name, "r") while 1: line = fo.readline() if not line: break line_list = line.strip() line_list = list(map(float, line_list.split(','))) increments = Coordinate_compensation modified_list = [val + inc for val, inc in zip(line_list, increments)] if (i == 0): # 初始化透传服务 suc, result, id = sendCMD(sock, "transparent_transmission_init", { "lookahead": lookahead_time, "t": sample_time, "smoothness": 1, "response_enable": 0}) #print(result) # 添加透传伺服目标关节点信息到缓存中 send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": modified_list}) time.sleep(sleep_time) if(i >= turn_on_relay_start and i <= turn_on_relay_end): turn_on_relay() #print("打开喷枪")#开喷枪 if(i >= turn_off_relay_start and i <= turn_off_relay_end): turn_off_relay() #print("关闭喷枪")#关喷枪 i = i + 1 fo.close() suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf") end_time = time.time() #print(f"程序执行时间:{end_time - start_time:.6f} 秒") print("第八段程序执行完") # 第九段程序封装为函数,hjb改 def program9(sock): start_time = time.time() i = 0 increments = [0, 0, 0, 0, 0, 0] reference_pos = None # 初始化参考位置 new_joint_pos = None # 初始化逆解得到的新关节位置 pose_file = open('/home/raspberrypi/robot1/Pose_5.txt', "r") first_pose_line = pose_file.readline().strip() if first_pose_line: Path_First_Position = list(map(float, first_pose_line.split(','))) Modified_First_Position = [val + inc for val, inc in zip(Path_First_Position, increments)] target_pose = Modified_First_Position else: #print("Pose_5.txt为空,无法获取初始位姿") pose_file.close() return pose_file.close() file_name = '/home/raspberrypi/robot1/joint_positions_assembled_5.txt' fo = open(file_name, "r") while 1: # 读取一行内容 line = fo.readline() if not line: break line_list = line.strip() line_list = list(map(float, line_list.split(','))) if reference_pos is None: reference_pos = line_list suc, angle, id = sendCMD(sock, "inverseKinematic", {"targetPose": target_pose, "referencePos": reference_pos}) if suc and angle is not None: new_joint_pos = angle else: #print("逆解计算失败,使用原有关节位置") new_joint_pos = line_list # 失败时使用原位置 modified_list = new_joint_pos if new_joint_pos is not None else line_list if (i == 0): suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": 100}) wait_stop(sock) # 等待机器人停止 suc, result, id = sendCMD(sock, "transparent_transmission_init", { "lookahead": lookahead_time, "t": 5, "smoothness": 1, "response_enable": 0}) send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPos": modified_list}) time.sleep(0.01) i = i + 1 fo.close() end_time = time.time() print("第九段程序执行完") def program10(sock): start_time = time.time() i = 0 increments = [0, 0, 0, 0, 0, 0] reference_pos = None new_joint_pos = None pose_file = open('/home/raspberrypi/robot1/Pose_6.txt', "r") first_pose_line = pose_file.readline().strip() if first_pose_line: Path_First_Position = list(map(float, first_pose_line.split(','))) Modified_First_Position = [val + inc for val, inc in zip(Path_First_Position, increments)] target_pose = Modified_First_Position else: #print("Pose_6.txt为空,无法获取初始位姿") pose_file.close() return pose_file.close() file_name = '/home/raspberrypi/robot1/joint_positions_assembled_6.txt' fo = open(file_name, "r") while 1: line = fo.readline() if not line: break line_list = line.strip() line_list = list(map(float, line_list.split(','))) if reference_pos is None: reference_pos = line_list suc, angle, id = sendCMD(sock, "inverseKinematic", {"targetPose": target_pose, "referencePos": reference_pos}) if suc and angle is not None: new_joint_pos = angle else: #print("逆解计算失败,使用原有关节位置") new_joint_pos = line_list # 失败时使用原位置 modified_list = new_joint_pos if new_joint_pos is not None else line_list if (i == 0): suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": 100}) wait_stop(sock) suc, result, id = sendCMD(sock, "transparent_transmission_init", { "lookahead": lookahead_time, "t": 5, "smoothness": 1, "response_enable": 0}) send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPos": modified_list}) time.sleep(0.01) i = i + 1 fo.close() end_time = time.time() print("第十段程序执行完") def program11(sock): start_time = time.time() i = 0 increments = [0, 0, 0, 0, 0, 0] reference_pos = None new_joint_pos = None pose_file = open('/home/raspberrypi/robot1/Pose_7.txt', "r") first_pose_line = pose_file.readline().strip() if first_pose_line: Path_First_Position = list(map(float, first_pose_line.split(','))) Modified_First_Position = [val + inc for val, inc in zip(Path_First_Position, increments)] target_pose = Modified_First_Position else: #print("Pose_7.txt为空,无法获取初始位姿") pose_file.close() return pose_file.close() file_name = '/home/raspberrypi/robot1/joint_positions_assembled_7.txt' fo = open(file_name, "r") while 1: line = fo.readline() if not line: break line_list = line.strip() line_list = list(map(float, line_list.split(','))) if reference_pos is None: reference_pos = line_list suc, angle, id = sendCMD(sock, "inverseKinematic", {"targetPose": target_pose, "referencePos": reference_pos}) if suc and angle is not None: new_joint_pos = angle else: #print("逆解计算失败,使用原有关节位置") new_joint_pos = line_list # 失败时使用原位置 modified_list = new_joint_pos if new_joint_pos is not None else line_list if (i == 0): suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": 100}) wait_stop(sock) # 等待机器人停止 suc, result, id = sendCMD(sock, "transparent_transmission_init", { "lookahead": lookahead_time, "t": 5, "smoothness": 1, "response_enable": 0}) send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPos": modified_list}) time.sleep(0.01) i = i + 1 fo.close() end_time = time.time() print("第十一段程序执行完") # 第十二段程序封装为函数 def program12(sock): start_time = time.time() i = 0 increments = [0, 0, 0, 0, 0, 0] reference_pos = None new_joint_pos = None pose_file = open('/home/raspberrypi/robot1/Pose_1.txt', "r") first_pose_line = pose_file.readline().strip() if first_pose_line: Path_First_Position = list(map(float, first_pose_line.split(','))) Modified_First_Position = [val + inc for val, inc in zip(Path_First_Position, increments)] target_pose = Modified_First_Position else: #print("Pose_1.txt为空,无法获取初始位姿") pose_file.close() return pose_file.close() file_name = '/home/raspberrypi/robot1/joint_positions_assembled_1.txt' fo = open(file_name, "r") while 1: line = fo.readline() if not line: break line_list = line.strip() line_list = list(map(float, line_list.split(','))) if reference_pos is None: reference_pos = line_list suc, angle, id = sendCMD(sock, "inverseKinematic", {"targetPose": target_pose, "referencePos": reference_pos}) if suc and angle is not None: new_joint_pos = angle else: #print("逆解计算失败,使用原有关节位置") new_joint_pos = line_list modified_list = new_joint_pos if new_joint_pos is not None else line_list if (i == 0): suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": 100}) wait_stop(sock) suc, result, id = sendCMD(sock, "transparent_transmission_init", { "lookahead": lookahead_time, "t": sample_time_test, "smoothness": 1, "response_enable": 0}) send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPos": modified_list}) time.sleep(sleep_time_test) i = i + 1 fo.close() end_time = time.time() print("第十二段程序执行完") # 第十三段程序封装为函数 def program13(sock): print("第十三段程序开始执行") start_time = time.time() i = 0 suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf") time.sleep(0) file_name = '/home/raspberrypi/robot1/Pose_1.txt' fo = open(file_name, "r") while 1: line = fo.readline() if not line: break line_list = line.strip() line_list = list(map(float, line_list.split(','))) increments = [0, 0, 0, 0, 0, 0] modified_list = [val + inc for val, inc in zip(line_list, increments)] if (i == 0): #suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": 100}) #print(result) #wait_stop(sock) #print(1) # 初始化透传服务 suc, result, id = sendCMD(sock, "transparent_transmission_init", { "lookahead": lookahead_time, "t": sample_time_test, "smoothness": 1, "response_enable": 0}) # 添加透传伺服目标关节点信息到缓存中 send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": modified_list}) time.sleep(sleep_time_test) i = i + 1 fo.close() end_time = time.time() print("第十三段程序执行完") # 第十四段程序封装为函数,hjb改 def program14(sock): start_time = time.time() i = 0 increments = [0, 0, 0, 0, 0, 0] reference_pos = None new_joint_pos = None pose_file = open('/home/raspberrypi/robot1/Pose_2.txt', "r") first_pose_line = pose_file.readline().strip() if first_pose_line: Path_First_Position = list(map(float, first_pose_line.split(','))) Modified_First_Position = [val + inc for val, inc in zip(Path_First_Position, increments)] target_pose = Modified_First_Position else: #print("Pose_2.txt为空,无法获取初始位姿") pose_file.close() return pose_file.close() file_name = '/home/raspberrypi/robot1/joint_positions_assembled_2.txt' fo = open(file_name, "r") while 1: line = fo.readline() if not line: break line_list = line.strip() line_list = list(map(float, line_list.split(','))) if reference_pos is None: reference_pos = line_list suc, angle, id = sendCMD(sock, "inverseKinematic", {"targetPose": target_pose, "referencePos": reference_pos}) if suc and angle is not None: new_joint_pos = angle else: #print("逆解计算失败,使用原有关节位置") new_joint_pos = line_list modified_list = new_joint_pos if new_joint_pos is not None else line_list if (i == 0): suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": move_speed_test}) time.sleep(1) wait_stop(sock) suc, result, id = sendCMD(sock, "transparent_transmission_init", { "lookahead": lookahead_time, "t": sample_time_test, "smoothness": 1, "response_enable": 0}) send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPos": modified_list}) time.sleep(sleep_time_test) i = i + 1 fo.close() end_time = time.time() print("第十四段程序执行完") # 第十五段程序封装为函数 def program15(sock): start_time = time.time() # 返回当前时间的时间戳(秒) i = 0 suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf") time.sleep(0) file_name = '/home/raspberrypi/robot1/Pose_2.txt' fo = open(file_name, "r") while 1: line = fo.readline() if not line: break line_list = line.strip() line_list = list(map(float, line_list.split(','))) increments = [0, 0, 0, 0, 0, 0] modified_list = [val + inc for val, inc in zip(line_list, increments)] if (i == 0): suc, result, id = sendCMD(sock, "moveByJoint", {"targetPos": modified_list, "speed": 100}) wait_stop(sock) # 等待机器人停止 suc, result, id = sendCMD(sock, "transparent_transmission_init", { "lookahead": lookahead_time, "t": sample_time_test, "smoothness": 1, "response_enable": 0}) # 添加透传伺服目标关节点信息到缓存中 send_Point(sock, "tt_put_servo_joint_to_buf", {"targetPose": modified_list}) time.sleep(sleep_time_test) i = i + 1 fo.close() end_time = time.time() print("第十五段程序执行完") if __name__ == "__main__": init_gpio() ser = serial_init() # 严格检查串口初始化结果 if not ser or not isinstance(ser, serial.Serial) or not ser.is_open: print("串口初始化失败,无法继续运行") # 确保在函数内部返回 exit() time.sleep(20) turn_off_relay() # 默认关闭喷枪 # 机器人IP地址 robot_ip = "192.168.1.100" result = connectETController(robot_ip) sock = None if len(result) == 2: conSuc, sock = result if conSuc: print("成功连接到机器人") ##########################################################################################################################新添加 # 1.上电设置 ret, result, id = sendCMD(sock, "set_robot_power_status", {"status": 1}) print("开始上电") time.sleep(20) print("开始检测上电状态") ret, result, id = sendCMD(sock, "get_robot_power_status") if result == 0: ret, result, id = sendCMD(sock, "set_robot_power_status", {"status": 1}) time.sleep(20) else : print("已上电") # 2. 清除报警 print("清除报警") ret, result, id = sendCMD(sock,"clearAlarm") print("清除报警返回值") print(result) time.sleep(5) print("报警清除完成") # 获取同步状态 suc, result , id = sendCMD(sock, "getMotorStatus") if result == 0: # 同步伺服编码器数据 suc,result,id = sendCMD(sock, "syncMotorStatus") if result: print("同步伺服编码器数据成功") else: print("同步伺服编码器数据失败") time. sleep (2) # 3. 编码器零位校准 suc, result, id = sendCMD(sock, "getServoStatus") print("获取伺服状态返回值") print(result) time.sleep(2) if result == 0: # 设置机械臂伺服状态为“开”(status=1) suc, result, id = sendCMD(sock, "set_servo_status", {"status": 1}) print("设置机械臂伺服状态返回值") print(result) time.sleep(5) suc, result, id = sendCMD(sock, "calibrate_encoder_zero_position") print("编码器校准返回值") print(result) time.sleep(5) if result == 1: print("编码器校准成功") else: print("编码器校准失败") ################################################################################################################################ # 初始化透传服务 suc, result, id = sendCMD(sock, "get_transparent_transmission_state") if id == 1: print(suc, result, id) if result == 1: suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf") print("已清空透传缓存") time.sleep(0.5) else: print("无缓存") # 在此处做检测等相关操作 suc, result, id = sendCMD(sock, "getRobotState") if (result == 4): # 清除报警 suc, result, id = sendCMD(sock, "clearAlarm") time.sleep(0.5) # 获取同步状态 suc, result, id = sendCMD(sock, "getMotorStatus") if (result == 0): # 同步伺服编码器数据 suc, result, id = sendCMD(sock, "syncMotorStatus") time.sleep(0.5) else: print("同步状态获取失败") # 获取机械臂伺服状态 suc, result, id = sendCMD(sock, "getServoStatus") if (result == 0): # 设置机械臂伺服状态ON suc, result, id = sendCMD(sock, "set_servo_status", {"status": 1}) time.sleep(1) else: print("机械臂伺服取失败") # 获取当前机器人是否处于透传状态 suc, result, id = sendCMD(sock, "get_transparent_transmission_state") print(result) if (result == 1): # 清空透传缓存 suc, result, id = sendCMD(sock, "tt_clear_servo_joint_buf") time.sleep(0.5) else: print("非缓存状态") time.sleep(0.01) print("初始化完成,等待指令...") # 用字典模拟switch-case逻辑 program_switch = { 1: program1, 2: program2, 3: program3, # 保留第三段程序的选择入口 4: program4, 5: program5, 6: program6, 7: program7, 8: program8, 9: program9,#停机 10:program10,#洗枪位置1 11:program11,#洗枪位置2 12:program12,#以下为试运行程序 13:program13, 14:program14, 15:program15, } # 关键修改:启动串口接收线程(daemon=True:主程序退出时线程自动终止) receive_thread = threading.Thread(target=serial_receive_data, args=(ser,), daemon=True) receive_thread.start() print("串口接收线程已启动") # 定义四种循环序列 sequence_1 = [2, 3, 4, 1] # 情况1的循环序列 sequence_2 = [4, 1, 2, 3] # 情况2的循环序列 sequence_3 = [6, 7, 8, 5] # 情况3的循环序列 sequence_4 = [8, 5, 6, 7] # 情况4的循环序列 sequence_5 = [13, 14, 15, 12] # 情况4的循环序列 # 新增:程序编号与延时时间映射字典(按需求精准配置) program_delay_map = { 1: 1, # program1延时5秒 2: 1.5, # program2延时0.5秒 3: 1, # program3延时5秒 4: 1.5, # program4延时0.5秒 5: 1, # program5延时5秒 6: 1.5, # program6延时0.5秒 7: 1, # program7延时5秒 8: 1.5, # program8延时0.5秒 12: 5, # program12延时5秒 13: 0.5, # program13延时0.5秒 14: 5, # program14延时5秒 15: 0.5, # program4延时0.5秒 } delay_time_10_13579_30 = 2 # 外层主循环 while True: try: # 等待主指令(从共享缓冲区读取) print("\n等待串口指令...") main_cmd_bytes = None while main_cmd_bytes is None: main_cmd_bytes = read_cmd_from_shared(timeout=1.0) time.sleep(0.01) # 解析主指令(帧头后第一字节为指令值) cmd = main_cmd_bytes[1] print(f"收到串口指令: 0x{cmd:02X}") # 情况1: 接收到1 if cmd == 1: print("触发情况1: 执行program1") program_switch[1](sock) time.sleep(delay_time_10_13579_30) ser.write(b'\x80\x80') ser.flush() print("执行program1结束,已发送响应0x80 0x80") # 等待子指令接收 sub_cmd_bytes = None while sub_cmd_bytes is None: sub_cmd_bytes = read_cmd_from_shared(timeout=1.0) time.sleep(0.01) sub_cmd = sub_cmd_bytes[0] # 应为0xAA sub_cmd1 = sub_cmd_bytes[1] # 子指令值(如0x20、0x0a) print(f"收到子指令: 0x{sub_cmd:02X} 0x{sub_cmd1:02X}") if sub_cmd == 0xAA: if sub_cmd1 == 0x20: print("开始情况1循环序列: program2->3->4->1...") current_index = 0 running = True while running: # 执行当前序列中的程序 current_program = sequence_1[current_index] #print(f"执行序列程序: {current_program}") program_switch[current_program](sock) print(current_program) # 读取可能的停止指令(超时0.1秒,不阻塞循环) stop_cmd_bytes = read_cmd_from_shared(timeout=0.1) if stop_cmd_bytes is not None and stop_cmd_bytes == b'\xAA\x30': print("收到0xAA 0x30,停止循环") ser.write(b'\x80\x80') ser.flush() running = False time.sleep(0.5) # 关键修改:按程序编号从字典获取延时,精准执行 delay = program_delay_map.get(current_program, 0.0) if delay > 0: print(f"program{current_program}执行完成,延时{delay}秒...") time.sleep(delay) else: print(f"当前执行程序是: {current_program}(无指定延时)") # 移动到下一个程序索引 current_index = (current_index + 1) % len(sequence_1) # 注意这里要换成当前使用的 sequence time.sleep(0.05) # 回到洗枪位置指令 elif sub_cmd1 == 0x0a: print(f"接收到指令0xAA 0x{sub_cmd1:02X},切换执行对应程序") program_switch[10](sock) time.sleep(delay_time_10_13579_30) ser.write(b'\x80\x80') ser.flush() print("执行program10结束,已发送响应0x80 0x80") print("返回最外层等待串口数据") else: print(f"收到未知子指令: 0x{sub_cmd:02X},继续等待") # 情况2: 接收到3 elif cmd == 3: print("触发情况2: 执行program3") program_switch[3](sock) time.sleep(delay_time_10_13579_30) ser.write(b'\x80\x80') ser.flush() print("执行program3结束,已发送响应0x80 0x80") # 子指令接收(调用read_cmd_from_shared) print("等待0x20开始循环或其他指令...") sub_cmd_bytes = None while sub_cmd_bytes is None: sub_cmd_bytes = read_cmd_from_shared(timeout=1.0) time.sleep(0.01) sub_cmd = sub_cmd_bytes[0] sub_cmd1 = sub_cmd_bytes[1] print(f"收到子指令: 0x{sub_cmd:02X}") if sub_cmd == 0xAA: if sub_cmd1 == 0x20: print("开始情况2循环序列: program4->1->2->3...") current_index = 0 running = True while running: current_program = sequence_2[current_index] print(f"执行序列程序: {current_program}") program_switch[current_program](sock) # 读取可能的停止指令(超时0.1秒,不阻塞循环) stop_cmd_bytes = read_cmd_from_shared(timeout=0.1) if stop_cmd_bytes is not None and stop_cmd_bytes == b'\xAA\x30': print("收到0xAA 0x30,停止循环") ser.write(b'\x80\x80') ser.flush() running = False time.sleep(0.5) # 关键修改:统一字典映射延时 delay = program_delay_map.get(current_program, 0.0) if delay > 0: print(f"program{current_program}执行完成,延时{delay}秒...") time.sleep(delay) else: print(f"当前执行程序是: {current_program}(无指定延时)") # 移动到下一个程序索引 current_index = (current_index + 1) % len(sequence_2) # 注意这里要换成当前使用的 sequence time.sleep(0.05) #回到洗枪位置 elif sub_cmd1 == 0x0a: print(f"接收到指令0xAA 0x{sub_cmd:02X},切换执行对应程序") program_switch[10](sock) time.sleep(delay_time_10_13579_30) ser.write(b'\x80\x80') ser.flush() print("执行program10结束,已发送响应0x80 0x80") print("返回最外层等待串口数据") else: print(f"收到未知子指令: 0x{sub_cmd1:02X},继续等待") # 情况3: 接收到5 elif cmd == 5: print("触发情况3: 执行program5") program_switch[5](sock) time.sleep(delay_time_10_13579_30) ser.write(b'\x80\x80') ser.flush() print("执行program5结束,已发送响应0x80 0x80") # 子指令接收(调用read_cmd_from_shared) print("等待0x20开始循环或其他指令...") sub_cmd_bytes = None while sub_cmd_bytes is None: sub_cmd_bytes = read_cmd_from_shared(timeout=1.0) time.sleep(0.01) sub_cmd = sub_cmd_bytes[0] sub_cmd1 = sub_cmd_bytes[1] print(f"收到子指令: 0x{sub_cmd:02X}") if sub_cmd == 0xAA: if sub_cmd1 == 0x20: print("开始情况3循环序列: program6->7->8->5...") current_index = 0 running = True while running: current_program = sequence_3[current_index] print(f"执行序列程序: {current_program}") program_switch[current_program](sock) # 读取可能的停止指令(超时0.1秒,不阻塞循环) stop_cmd_bytes = read_cmd_from_shared(timeout=0.1) if stop_cmd_bytes is not None and stop_cmd_bytes == b'\xAA\x30': print("收到0xAA 0x30,停止循环") ser.write(b'\x80\x80') ser.flush() running = False time.sleep(0.5) # 关键修改:统一字典映射延时 delay = program_delay_map.get(current_program, 0.0) if delay > 0: print(f"program{current_program}执行完成,延时{delay}秒...") time.sleep(delay) else: print(f"当前执行程序是: {current_program}(无指定延时)") # 移动到下一个程序索引 current_index = (current_index + 1) % len(sequence_3) # 注意这里要换成当前使用的 sequence time.sleep(0.05) #回到洗枪位置指令 elif sub_cmd1 == 0x0a: print(f"接收到指令0xAA 0x{sub_cmd1:02X},切换执行对应程序") program_switch[10](sock) time.sleep(delay_time_10_13579_30) ser.write(b'\x80\x80') ser.flush() print("执行program10结束,已发送响应0x80 0x80") print("返回最外层等待串口数据") else: print(f"收到未知子指令: 0x{sub_cmd:02X},继续等待") # 情况4: 接收到7 elif cmd == 7: print("触发情况4: 执行program7") program_switch[7](sock) time.sleep(delay_time_10_13579_30) ser.write(b'\x80\x80') ser.flush() print("执行program7结束,已发送响应0x80 0x80") # 子指令接收(调用read_cmd_from_shared) print("等待0x20开始循环或其他指令...") sub_cmd_bytes = None while sub_cmd_bytes is None: sub_cmd_bytes = read_cmd_from_shared(timeout=1.0) time.sleep(0.01) sub_cmd = sub_cmd_bytes[0] sub_cmd1 = sub_cmd_bytes[1] print(f"收到子指令: 0x{sub_cmd:02X}") if sub_cmd == 0xAA: if sub_cmd1 == 0x20: print("开始情况4循环序列: program8->5->6->7...") current_index = 0 running = True while running: current_program = sequence_4[current_index] print(f"执行序列程序: {current_program}") program_switch[current_program](sock) # 读取可能的停止指令(超时0.1秒,不阻塞循环) stop_cmd_bytes = read_cmd_from_shared(timeout=0.1) if stop_cmd_bytes is not None and stop_cmd_bytes == b'\xAA\x30': print("收到0xAA 0x30,停止循环") ser.write(b'\x80\x80') ser.flush() running = False time.sleep(0.5) # 关键修改:统一字典映射延时 delay = program_delay_map.get(current_program, 0.0) if delay > 0: print(f"program{current_program}执行完成,延时{delay}秒...") time.sleep(delay) else: print(f"当前执行程序是: {current_program}(无指定延时)") # 移动到下一个程序索引 current_index = (current_index + 1) % len(sequence_4) # 注意这里要换成当前使用的 sequence time.sleep(0.05) #回到洗枪位置指令 elif sub_cmd1 == 0x0a: print(f"接收到指令0xAA 0x{sub_cmd1:02X},切换执行对应程序") program_switch[10](sock) time.sleep(delay_time_10_13579_30) ser.write(b'\x80\x80') ser.flush() print("执行program10结束,已发送响应0x80 0x80") print("返回最外层等待串口数据") else: print(f"收到未知子指令: 0x{sub_cmd:02X},继续等待") # 试运行程序段,与情况1相同,只不过速度较慢 elif cmd == 12: print("触发情况12: 执行program12") program_switch[12](sock) time.sleep(delay_time_10_13579_30) ser.write(b'\x80\x80') ser.flush() print("执行program12结束,已发送响应0x80 0x80") # 子指令接收(调用read_cmd_from_shared) print("等待0x20开始循环或其他指令...") sub_cmd_bytes = None while sub_cmd_bytes is None: sub_cmd_bytes = read_cmd_from_shared(timeout=1.0) time.sleep(0.01) sub_cmd = sub_cmd_bytes[0] sub_cmd1 = sub_cmd_bytes[1] print(f"收到子指令: 0x{sub_cmd:02X}") if sub_cmd == 0xAA and sub_cmd1 == 0x20: print("开始情况1循环序列: program13->14->15->12...") current_index = 0 running = True while running: # 执行当前序列中的程序 current_program = sequence_5[current_index] print(f"执行序列程序: {current_program}") program_switch[current_program](sock) # 读取可能的停止指令(超时0.1秒,不阻塞循环) stop_cmd_bytes = read_cmd_from_shared(timeout=0.1) if stop_cmd_bytes is not None and stop_cmd_bytes == b'\xAA\x30': print("收到0xAA 0x30,停止循环") ser.write(b'\x80\x80') ser.flush() running = False time.sleep(0.5) # 关键修改:统一字典映射延时 delay = program_delay_map.get(current_program, 0.0) if delay > 0: print(f"program{current_program}执行完成,延时{delay}秒...") time.sleep(delay) else: print(f"当前执行程序是: {current_program}(无指定延时)") # 移动到下一个程序索引 current_index = (current_index + 1) % len(sequence_5) # 注意这里要换成当前使用的 sequence time.sleep(0.1) # 情况5: 接收到9 elif cmd == 9: print("触发情况5: 执行program9") program_switch[9](sock) time.sleep(delay_time_10_13579_30) ser.write(b'\x80\x80') ser.flush() print("执行program9结束,已发送响应0x80 0x80") print("返回最外层等待串口数据") # 情况6: 接收到10 elif cmd == 10: print("触发情况6: 执行program10") program_switch[10](sock) time.sleep(delay_time_10_13579_30) ser.write(b'\x80\x80') ser.flush() print("执行program10结束,已发送响应0x80 0x80") print("返回最外层等待串口数据") # 情况7: 接收到11 elif cmd == 11: print("触发情况7: 执行program11") program_switch[11](sock) time.sleep(delay_time_10_13579_30) ser.write(b'\x80\x80') ser.flush() print("执行program10结束,已发送响应0x80 0x80") print("返回最外层等待串口数据") # 退出指令 elif cmd == 0x00: print("收到退出指令,程序终止") break # 未知指令 else: print(f"未知指令: 0x{cmd:02X},等待下一条指令...") time.sleep(0.01) except KeyboardInterrupt: print("程序被手动中断") break except Exception as e: print(f"主循环错误: {e}") time.sleep(0.5) else: print("连接机器人失败") else: print("无法连接到机器人") # 资源清理 if sock: sock.close() if ser and isinstance(ser, serial.Serial) and ser.is_open: ser.close() print("串口已关闭")