robotengine.serial_io
serial_io 是 robotengine 控制硬件串口的节点。
1""" 2 3serial_io 是 robotengine 控制硬件串口的节点。 4 5""" 6 7from .node import Node 8import serial.tools.list_ports 9import serial 10from enum import Enum 11import random 12from robotengine.tools import hex2str, warning, error, info 13 14class DeviceType(Enum): 15 """ 设备类型枚举 """ 16 STM32F407 = 0 17 """ STM32F407 设备类型 """ 18 ARDUINO_MEGA2560 = 1 19 """ Arduino Mega2560 设备类型 """ 20 21class CheckSumType(Enum): 22 """ 校验和类型枚举 """ 23 NONE = 0 24 """ 无校验和 """ 25 SUM8 = 1 26 """ SUM8 校验和 """ 27 SUM16 = 2 28 """ SUM16 校验和 """ 29 XOR8 = 3 30 """ XOR8 校验和 """ 31 XOR16 = 4 32 """ XOR16 校验和 """ 33 CRC8 = 5 34 """ CRC8 校验和 """ 35 CRC16 = 6 36 """ CRC16 校验和 """ 37 38checksum_length_map = { 39 CheckSumType.SUM8: 1, 40 CheckSumType.SUM16: 2, 41 CheckSumType.XOR8: 1, 42 CheckSumType.XOR16: 2, 43 CheckSumType.CRC8: 1, 44 CheckSumType.CRC16: 2 45 } 46""" 校验和长度映射表 """ 47 48class SerialIO(Node): 49 """ 串口节点 """ 50 def __init__(self, name="SerialIO", device_type=DeviceType.STM32F407, checksum_type=CheckSumType.NONE, header=[], baudrate=115200, timeout=1.0, warn=True): 51 """ 52 初始化串口节点。 53 54 :param name: 节点名称。 55 :param device_type: 设备类型。 56 :param checksum_type: 校验和类型。 57 :param header: 数据头。 58 :param baudrate: 波特率。 59 :param timeout: 超时时间。 60 """ 61 super().__init__(name) 62 self._device_type = device_type 63 self._checksum_type = checksum_type 64 self._header = header 65 self._header_flag = 0 66 self._device = None 67 self._serial: serial.Serial = None 68 self._baudrate = baudrate 69 self._timeout = timeout 70 71 self._warn = warn 72 self._receive_data = bytes() 73 74 self._initialize() 75 if self._device is None and self._warn: 76 warning(f"节点 {self.name} 初始化时未检测到 {self._device_type} 设备,将在内部更新中继续尝试") 77 78 def _update(self, delta) -> None: 79 if self._device is None: 80 self._initialize() 81 return 82 83 def _initialize(self): 84 self._device = self._find_device() 85 if self._device: 86 info(f"节点 {self.name} 初始化时检测到 {self._device_type} 设备,串口为 {self._device},波特率为 {self._baudrate}") 87 self._serial = serial.Serial(self._device, self._baudrate, timeout=self._timeout) 88 # 清空串口缓冲区 89 self._serial.reset_input_buffer() 90 self._serial.reset_output_buffer() 91 92 def _find_device(self): 93 if self._device_type == DeviceType.STM32F407: 94 target_vid = 0x1A86 95 target_pid = 0x7523 96 elif self._device_type == DeviceType.ARDUINO_MEGA2560: 97 target_vid = 0x2341 98 target_pid = 0x0043 99 100 ports = serial.tools.list_ports.comports() 101 for port in ports: 102 if port.vid == target_vid and port.pid == target_pid: 103 return port.device 104 return None 105 106 def _get_check_sum(self, data: bytes) -> bytes: 107 if self._checksum_type == CheckSumType.SUM8: 108 check_sum = sum(data) & 0xFF 109 return bytes([check_sum]) 110 elif self._checksum_type == CheckSumType.SUM16: 111 check_sum = sum(data) & 0xFFFF 112 return check_sum.to_bytes(2, byteorder='big') 113 elif self._checksum_type == CheckSumType.XOR8: 114 check_sum = 0 115 for byte in data: 116 check_sum ^= byte 117 return bytes([check_sum]) 118 elif self._checksum_type == CheckSumType.XOR16: 119 check_sum = 0 120 for byte in data: 121 check_sum ^= byte 122 return check_sum.to_bytes(2, byteorder='big') 123 elif self._checksum_type == CheckSumType.CRC8: 124 crc = 0x00 125 polynomial = 0x07 126 for byte in data: 127 crc ^= byte 128 for _ in range(8): 129 if crc & 0x80: 130 crc = (crc << 1) ^ polynomial 131 else: 132 crc <<= 1 133 crc &= 0xFF 134 return bytes([crc]) 135 elif self._checksum_type == CheckSumType.CRC16: 136 crc = 0xFFFF 137 polynomial = 0x8005 138 for byte in data: 139 crc ^= byte 140 for _ in range(8): 141 if crc & 0x0001: 142 crc = (crc >> 1) ^ polynomial 143 else: 144 crc >>= 1 # 否则仅右移 145 return crc.to_bytes(2, byteorder='big') 146 else: 147 raise ValueError("无效的校验和类型") 148 149 def _add_header(self, data: bytes) -> bytes: 150 return bytes(self._header) + data 151 152 def random_bytes(self, length: int) -> bytes: 153 """ 生成随机字节 """ 154 return bytes([random.randint(0, 255) for _ in range(length)]) 155 156 def fixed_bytes(self, byte: int, length: int) -> bytes: 157 """ 生成固定字节 """ 158 return bytes([byte for _ in range(length)]) 159 160 def transmit(self, data: bytes) -> bytes: 161 """ 发送串口数据 """ 162 if self._serial is None: 163 if self._warn: 164 warning(f"节点 {self.name} 串口未初始化,无法发送数据") 165 return 166 if self._checksum_type !=CheckSumType.NONE: 167 data += self._get_check_sum(data) 168 if self._header: 169 data = self._add_header(data) 170 self._serial.write(data) 171 return data 172 173 def receive(self, length: int) -> bytes: 174 """ 175 接收串口数据 176 177 :param len: 接收数据的长度 178 """ 179 if self._serial is None: 180 if self._warn: 181 warning(f"节点 {self.name} 串口未初始化,无法接收数据") 182 return 183 184 in_waiting = self._serial.in_waiting 185 if in_waiting == 0: 186 return None 187 188 if self._header and self._header_flag < len(self._header): 189 target_data = self._header[self._header_flag] 190 data = self._serial.read(1) 191 if ord(data) == target_data: 192 self._header_flag += 1 193 self._receive_data += data 194 else: 195 self._header_flag = 0 196 self._receive_data = bytes() 197 return None 198 199 pending_length = length - len(self._receive_data) 200 self._receive_data += self._serial.read(min(in_waiting, pending_length)) 201 if len(self._receive_data) >= length: 202 data = self._receive_data[:length] 203 self._receive_data = self._receive_data[length:] 204 self._header_flag = 0 205 return data 206 else: 207 return None 208 209 210 # def receive(self, length: int) -> bytes: 211 # """ 212 # 接收串口数据 213 214 # :param len: 接收数据的长度 215 # """ 216 # if self._serial is None: 217 # if self._warn: 218 # warning(f"节点 {self.name} 串口未初始化,无法接收数据") 219 # return 220 221 # in_waiting = self._serial.in_waiting 222 # pending_length = length - len(self._receive_data) 223 # if in_waiting >= 0: 224 # self._receive_data += self._serial.read(min(in_waiting, pending_length)) 225 # if len(self._receive_data) >= length: 226 # data = self._receive_data[:length] 227 # self._receive_data = self._receive_data[length:] 228 # return data 229 # else: 230 # return None 231 # else: 232 # return None 233 234 def check_sum(self, data: bytes) -> bool: 235 """ 校验串口数据 """ 236 if self._checksum_type == CheckSumType.NONE: 237 return True 238 checksum_length = checksum_length_map.get(self._checksum_type) 239 if checksum_length is None: 240 raise ValueError("无效的校验和类型,无法进行校验") 241 242 data_to_check = data[len(self._header):-checksum_length] 243 expected_checksum = data[-checksum_length:] 244 calculated_checksum = self._get_check_sum(data_to_check) 245 246 return calculated_checksum == expected_checksum 247 248 def __del__(self): 249 if self._serial: 250 self._serial.close()
class
DeviceType(enum.Enum):
15class DeviceType(Enum): 16 """ 设备类型枚举 """ 17 STM32F407 = 0 18 """ STM32F407 设备类型 """ 19 ARDUINO_MEGA2560 = 1 20 """ Arduino Mega2560 设备类型 """
设备类型枚举
Inherited Members
- enum.Enum
- name
- value
class
CheckSumType(enum.Enum):
22class CheckSumType(Enum): 23 """ 校验和类型枚举 """ 24 NONE = 0 25 """ 无校验和 """ 26 SUM8 = 1 27 """ SUM8 校验和 """ 28 SUM16 = 2 29 """ SUM16 校验和 """ 30 XOR8 = 3 31 """ XOR8 校验和 """ 32 XOR16 = 4 33 """ XOR16 校验和 """ 34 CRC8 = 5 35 """ CRC8 校验和 """ 36 CRC16 = 6 37 """ CRC16 校验和 """
校验和类型枚举
Inherited Members
- enum.Enum
- name
- value
checksum_length_map =
{<CheckSumType.SUM8: 1>: 1, <CheckSumType.SUM16: 2>: 2, <CheckSumType.XOR8: 3>: 1, <CheckSumType.XOR16: 4>: 2, <CheckSumType.CRC8: 5>: 1, <CheckSumType.CRC16: 6>: 2}
校验和长度映射表
49class SerialIO(Node): 50 """ 串口节点 """ 51 def __init__(self, name="SerialIO", device_type=DeviceType.STM32F407, checksum_type=CheckSumType.NONE, header=[], baudrate=115200, timeout=1.0, warn=True): 52 """ 53 初始化串口节点。 54 55 :param name: 节点名称。 56 :param device_type: 设备类型。 57 :param checksum_type: 校验和类型。 58 :param header: 数据头。 59 :param baudrate: 波特率。 60 :param timeout: 超时时间。 61 """ 62 super().__init__(name) 63 self._device_type = device_type 64 self._checksum_type = checksum_type 65 self._header = header 66 self._header_flag = 0 67 self._device = None 68 self._serial: serial.Serial = None 69 self._baudrate = baudrate 70 self._timeout = timeout 71 72 self._warn = warn 73 self._receive_data = bytes() 74 75 self._initialize() 76 if self._device is None and self._warn: 77 warning(f"节点 {self.name} 初始化时未检测到 {self._device_type} 设备,将在内部更新中继续尝试") 78 79 def _update(self, delta) -> None: 80 if self._device is None: 81 self._initialize() 82 return 83 84 def _initialize(self): 85 self._device = self._find_device() 86 if self._device: 87 info(f"节点 {self.name} 初始化时检测到 {self._device_type} 设备,串口为 {self._device},波特率为 {self._baudrate}") 88 self._serial = serial.Serial(self._device, self._baudrate, timeout=self._timeout) 89 # 清空串口缓冲区 90 self._serial.reset_input_buffer() 91 self._serial.reset_output_buffer() 92 93 def _find_device(self): 94 if self._device_type == DeviceType.STM32F407: 95 target_vid = 0x1A86 96 target_pid = 0x7523 97 elif self._device_type == DeviceType.ARDUINO_MEGA2560: 98 target_vid = 0x2341 99 target_pid = 0x0043 100 101 ports = serial.tools.list_ports.comports() 102 for port in ports: 103 if port.vid == target_vid and port.pid == target_pid: 104 return port.device 105 return None 106 107 def _get_check_sum(self, data: bytes) -> bytes: 108 if self._checksum_type == CheckSumType.SUM8: 109 check_sum = sum(data) & 0xFF 110 return bytes([check_sum]) 111 elif self._checksum_type == CheckSumType.SUM16: 112 check_sum = sum(data) & 0xFFFF 113 return check_sum.to_bytes(2, byteorder='big') 114 elif self._checksum_type == CheckSumType.XOR8: 115 check_sum = 0 116 for byte in data: 117 check_sum ^= byte 118 return bytes([check_sum]) 119 elif self._checksum_type == CheckSumType.XOR16: 120 check_sum = 0 121 for byte in data: 122 check_sum ^= byte 123 return check_sum.to_bytes(2, byteorder='big') 124 elif self._checksum_type == CheckSumType.CRC8: 125 crc = 0x00 126 polynomial = 0x07 127 for byte in data: 128 crc ^= byte 129 for _ in range(8): 130 if crc & 0x80: 131 crc = (crc << 1) ^ polynomial 132 else: 133 crc <<= 1 134 crc &= 0xFF 135 return bytes([crc]) 136 elif self._checksum_type == CheckSumType.CRC16: 137 crc = 0xFFFF 138 polynomial = 0x8005 139 for byte in data: 140 crc ^= byte 141 for _ in range(8): 142 if crc & 0x0001: 143 crc = (crc >> 1) ^ polynomial 144 else: 145 crc >>= 1 # 否则仅右移 146 return crc.to_bytes(2, byteorder='big') 147 else: 148 raise ValueError("无效的校验和类型") 149 150 def _add_header(self, data: bytes) -> bytes: 151 return bytes(self._header) + data 152 153 def random_bytes(self, length: int) -> bytes: 154 """ 生成随机字节 """ 155 return bytes([random.randint(0, 255) for _ in range(length)]) 156 157 def fixed_bytes(self, byte: int, length: int) -> bytes: 158 """ 生成固定字节 """ 159 return bytes([byte for _ in range(length)]) 160 161 def transmit(self, data: bytes) -> bytes: 162 """ 发送串口数据 """ 163 if self._serial is None: 164 if self._warn: 165 warning(f"节点 {self.name} 串口未初始化,无法发送数据") 166 return 167 if self._checksum_type !=CheckSumType.NONE: 168 data += self._get_check_sum(data) 169 if self._header: 170 data = self._add_header(data) 171 self._serial.write(data) 172 return data 173 174 def receive(self, length: int) -> bytes: 175 """ 176 接收串口数据 177 178 :param len: 接收数据的长度 179 """ 180 if self._serial is None: 181 if self._warn: 182 warning(f"节点 {self.name} 串口未初始化,无法接收数据") 183 return 184 185 in_waiting = self._serial.in_waiting 186 if in_waiting == 0: 187 return None 188 189 if self._header and self._header_flag < len(self._header): 190 target_data = self._header[self._header_flag] 191 data = self._serial.read(1) 192 if ord(data) == target_data: 193 self._header_flag += 1 194 self._receive_data += data 195 else: 196 self._header_flag = 0 197 self._receive_data = bytes() 198 return None 199 200 pending_length = length - len(self._receive_data) 201 self._receive_data += self._serial.read(min(in_waiting, pending_length)) 202 if len(self._receive_data) >= length: 203 data = self._receive_data[:length] 204 self._receive_data = self._receive_data[length:] 205 self._header_flag = 0 206 return data 207 else: 208 return None 209 210 211 # def receive(self, length: int) -> bytes: 212 # """ 213 # 接收串口数据 214 215 # :param len: 接收数据的长度 216 # """ 217 # if self._serial is None: 218 # if self._warn: 219 # warning(f"节点 {self.name} 串口未初始化,无法接收数据") 220 # return 221 222 # in_waiting = self._serial.in_waiting 223 # pending_length = length - len(self._receive_data) 224 # if in_waiting >= 0: 225 # self._receive_data += self._serial.read(min(in_waiting, pending_length)) 226 # if len(self._receive_data) >= length: 227 # data = self._receive_data[:length] 228 # self._receive_data = self._receive_data[length:] 229 # return data 230 # else: 231 # return None 232 # else: 233 # return None 234 235 def check_sum(self, data: bytes) -> bool: 236 """ 校验串口数据 """ 237 if self._checksum_type == CheckSumType.NONE: 238 return True 239 checksum_length = checksum_length_map.get(self._checksum_type) 240 if checksum_length is None: 241 raise ValueError("无效的校验和类型,无法进行校验") 242 243 data_to_check = data[len(self._header):-checksum_length] 244 expected_checksum = data[-checksum_length:] 245 calculated_checksum = self._get_check_sum(data_to_check) 246 247 return calculated_checksum == expected_checksum 248 249 def __del__(self): 250 if self._serial: 251 self._serial.close()
串口节点
SerialIO( name='SerialIO', device_type=<DeviceType.STM32F407: 0>, checksum_type=<CheckSumType.NONE: 0>, header=[], baudrate=115200, timeout=1.0, warn=True)
51 def __init__(self, name="SerialIO", device_type=DeviceType.STM32F407, checksum_type=CheckSumType.NONE, header=[], baudrate=115200, timeout=1.0, warn=True): 52 """ 53 初始化串口节点。 54 55 :param name: 节点名称。 56 :param device_type: 设备类型。 57 :param checksum_type: 校验和类型。 58 :param header: 数据头。 59 :param baudrate: 波特率。 60 :param timeout: 超时时间。 61 """ 62 super().__init__(name) 63 self._device_type = device_type 64 self._checksum_type = checksum_type 65 self._header = header 66 self._header_flag = 0 67 self._device = None 68 self._serial: serial.Serial = None 69 self._baudrate = baudrate 70 self._timeout = timeout 71 72 self._warn = warn 73 self._receive_data = bytes() 74 75 self._initialize() 76 if self._device is None and self._warn: 77 warning(f"节点 {self.name} 初始化时未检测到 {self._device_type} 设备,将在内部更新中继续尝试")
初始化串口节点。
:param name: 节点名称。
:param device_type: 设备类型。
:param checksum_type: 校验和类型。
:param header: 数据头。
:param baudrate: 波特率。
:param timeout: 超时时间。
def
random_bytes(self, length: int) -> bytes:
153 def random_bytes(self, length: int) -> bytes: 154 """ 生成随机字节 """ 155 return bytes([random.randint(0, 255) for _ in range(length)])
生成随机字节
def
fixed_bytes(self, byte: int, length: int) -> bytes:
157 def fixed_bytes(self, byte: int, length: int) -> bytes: 158 """ 生成固定字节 """ 159 return bytes([byte for _ in range(length)])
生成固定字节
def
transmit(self, data: bytes) -> bytes:
161 def transmit(self, data: bytes) -> bytes: 162 """ 发送串口数据 """ 163 if self._serial is None: 164 if self._warn: 165 warning(f"节点 {self.name} 串口未初始化,无法发送数据") 166 return 167 if self._checksum_type !=CheckSumType.NONE: 168 data += self._get_check_sum(data) 169 if self._header: 170 data = self._add_header(data) 171 self._serial.write(data) 172 return data
发送串口数据
def
receive(self, length: int) -> bytes:
174 def receive(self, length: int) -> bytes: 175 """ 176 接收串口数据 177 178 :param len: 接收数据的长度 179 """ 180 if self._serial is None: 181 if self._warn: 182 warning(f"节点 {self.name} 串口未初始化,无法接收数据") 183 return 184 185 in_waiting = self._serial.in_waiting 186 if in_waiting == 0: 187 return None 188 189 if self._header and self._header_flag < len(self._header): 190 target_data = self._header[self._header_flag] 191 data = self._serial.read(1) 192 if ord(data) == target_data: 193 self._header_flag += 1 194 self._receive_data += data 195 else: 196 self._header_flag = 0 197 self._receive_data = bytes() 198 return None 199 200 pending_length = length - len(self._receive_data) 201 self._receive_data += self._serial.read(min(in_waiting, pending_length)) 202 if len(self._receive_data) >= length: 203 data = self._receive_data[:length] 204 self._receive_data = self._receive_data[length:] 205 self._header_flag = 0 206 return data 207 else: 208 return None
接收串口数据
:param len: 接收数据的长度
def
check_sum(self, data: bytes) -> bool:
235 def check_sum(self, data: bytes) -> bool: 236 """ 校验串口数据 """ 237 if self._checksum_type == CheckSumType.NONE: 238 return True 239 checksum_length = checksum_length_map.get(self._checksum_type) 240 if checksum_length is None: 241 raise ValueError("无效的校验和类型,无法进行校验") 242 243 data_to_check = data[len(self._header):-checksum_length] 244 expected_checksum = data[-checksum_length:] 245 calculated_checksum = self._get_check_sum(data_to_check) 246 247 return calculated_checksum == expected_checksum
校验串口数据