robotengine.serial_io

serial_io 是 robotengine 控制硬件串口的节点。

  1"""
  2
  3serial_io 是 robotengine 控制硬件串口的节点。
  4
  5"""
  6
  7from .node import Node
  8import serial.tools.list_ports
  9import serial
 10from enum import Enum
 11import random
 12from robotengine.tools import hex2str, warning, error, info
 13
 14class DeviceType(Enum):
 15    """ 设备类型枚举 """
 16    STM32F407 = 0
 17    """ STM32F407 设备类型 """
 18    ARDUINO_MEGA2560 = 1
 19    """ Arduino Mega2560 设备类型 """
 20
 21class CheckSumType(Enum):
 22    """ 校验和类型枚举 """
 23    NONE = 0
 24    """ 无校验和 """
 25    SUM8 = 1
 26    """ SUM8 校验和 """
 27    SUM16 = 2
 28    """ SUM16 校验和 """
 29    XOR8 = 3
 30    """ XOR8 校验和 """
 31    XOR16 = 4
 32    """ XOR16 校验和 """
 33    CRC8 = 5
 34    """ CRC8 校验和 """
 35    CRC16 = 6
 36    """ CRC16 校验和 """
 37
 38checksum_length_map = {
 39        CheckSumType.SUM8: 1,
 40        CheckSumType.SUM16: 2,
 41        CheckSumType.XOR8: 1,
 42        CheckSumType.XOR16: 2,
 43        CheckSumType.CRC8: 1,
 44        CheckSumType.CRC16: 2
 45    }
 46""" 校验和长度映射表 """
 47
 48class SerialIO(Node):
 49    """ 串口节点 """
 50    def __init__(self, name="SerialIO", device_type=DeviceType.STM32F407, checksum_type=CheckSumType.NONE, header=[], baudrate=115200, timeout=1.0, warn=True):
 51        """ 
 52        初始化串口节点。
 53
 54            :param name: 节点名称。
 55            :param device_type: 设备类型。
 56            :param checksum_type: 校验和类型。
 57            :param header: 数据头。
 58            :param baudrate: 波特率。
 59            :param timeout: 超时时间。
 60        """
 61        super().__init__(name)
 62        self._device_type = device_type
 63        self._checksum_type = checksum_type
 64        self._header = header
 65        self._header_flag = 0
 66        self._device = None
 67        self._serial: serial.Serial = None
 68        self._baudrate = baudrate
 69        self._timeout = timeout
 70
 71        self._warn = warn
 72        self._receive_data = bytes()
 73
 74        self._initialize()
 75        if self._device is None and self._warn:
 76            warning(f"节点 {self.name} 初始化时未检测到 {self._device_type} 设备,将在内部更新中继续尝试")
 77
 78    def _update(self, delta) -> None:
 79        if self._device is None:
 80            self._initialize()
 81            return
 82        
 83    def _initialize(self):
 84        self._device = self._find_device()
 85        if self._device:
 86            info(f"节点 {self.name} 初始化时检测到 {self._device_type} 设备,串口为 {self._device},波特率为 {self._baudrate}")
 87            self._serial = serial.Serial(self._device, self._baudrate, timeout=self._timeout)
 88            # 清空串口缓冲区
 89            self._serial.reset_input_buffer()
 90            self._serial.reset_output_buffer()
 91
 92    def _find_device(self):
 93        if self._device_type == DeviceType.STM32F407:
 94            target_vid = 0x1A86
 95            target_pid = 0x7523
 96        elif self._device_type == DeviceType.ARDUINO_MEGA2560:
 97            target_vid = 0x2341
 98            target_pid = 0x0043
 99
100        ports = serial.tools.list_ports.comports()
101        for port in ports:
102            if port.vid == target_vid and port.pid == target_pid:
103                return port.device
104        return None
105    
106    def _get_check_sum(self, data: bytes) -> bytes:
107        if self._checksum_type == CheckSumType.SUM8:
108            check_sum = sum(data) & 0xFF
109            return bytes([check_sum])
110        elif self._checksum_type == CheckSumType.SUM16:
111            check_sum = sum(data) & 0xFFFF
112            return check_sum.to_bytes(2, byteorder='big')
113        elif self._checksum_type == CheckSumType.XOR8:
114            check_sum = 0
115            for byte in data:
116                check_sum ^= byte
117            return bytes([check_sum])
118        elif self._checksum_type == CheckSumType.XOR16:
119            check_sum = 0
120            for byte in data:
121                check_sum ^= byte
122            return check_sum.to_bytes(2, byteorder='big')
123        elif self._checksum_type == CheckSumType.CRC8:
124            crc = 0x00
125            polynomial = 0x07
126            for byte in data:
127                crc ^= byte
128                for _ in range(8):
129                    if crc & 0x80:
130                        crc = (crc << 1) ^ polynomial
131                    else:
132                        crc <<= 1
133                    crc &= 0xFF
134            return bytes([crc])
135        elif self._checksum_type == CheckSumType.CRC16:
136            crc = 0xFFFF
137            polynomial = 0x8005
138            for byte in data:
139                crc ^= byte
140                for _ in range(8):
141                    if crc & 0x0001:
142                        crc = (crc >> 1) ^ polynomial
143                    else:
144                        crc >>= 1  # 否则仅右移
145            return crc.to_bytes(2, byteorder='big')
146        else:
147            raise ValueError("无效的校验和类型")
148            
149    def _add_header(self, data: bytes) -> bytes:
150        return bytes(self._header) + data
151    
152    def random_bytes(self, length: int) -> bytes:
153        """ 生成随机字节 """
154        return bytes([random.randint(0, 255) for _ in range(length)])
155    
156    def fixed_bytes(self, byte: int, length: int) -> bytes:
157        """ 生成固定字节 """
158        return bytes([byte for _ in range(length)])
159    
160    def transmit(self, data: bytes) -> bytes:
161        """ 发送串口数据 """
162        if self._serial is None:
163            if self._warn:
164                warning(f"节点 {self.name} 串口未初始化,无法发送数据")
165            return
166        if self._checksum_type !=CheckSumType.NONE:
167            data += self._get_check_sum(data)
168        if self._header:
169            data = self._add_header(data)
170        self._serial.write(data)
171        return data
172    
173    def receive(self, length: int) -> bytes:
174        """ 
175        接收串口数据 
176        
177            :param len: 接收数据的长度
178        """
179        if self._serial is None:
180            if self._warn:
181                warning(f"节点 {self.name} 串口未初始化,无法接收数据")
182            return
183        
184        in_waiting = self._serial.in_waiting
185        if in_waiting == 0:
186            return None
187        
188        if self._header and self._header_flag < len(self._header):
189            target_data = self._header[self._header_flag]
190            data = self._serial.read(1)
191            if ord(data) == target_data:
192                self._header_flag += 1
193                self._receive_data += data
194            else:
195                self._header_flag = 0
196                self._receive_data = bytes()
197            return None
198        
199        pending_length = length - len(self._receive_data)
200        self._receive_data += self._serial.read(min(in_waiting, pending_length))
201        if len(self._receive_data) >= length:
202            data = self._receive_data[:length]
203            self._receive_data = self._receive_data[length:]
204            self._header_flag = 0
205            return data
206        else:
207            return None
208        
209            
210    # def receive(self, length: int) -> bytes:
211    #     """ 
212    #     接收串口数据 
213        
214    #         :param len: 接收数据的长度
215    #     """
216    #     if self._serial is None:
217    #         if self._warn:
218    #             warning(f"节点 {self.name} 串口未初始化,无法接收数据")
219    #         return
220        
221    #     in_waiting = self._serial.in_waiting
222    #     pending_length = length - len(self._receive_data)
223    #     if in_waiting >= 0:
224    #         self._receive_data += self._serial.read(min(in_waiting, pending_length))
225    #         if len(self._receive_data) >= length:
226    #             data = self._receive_data[:length]
227    #             self._receive_data = self._receive_data[length:]
228    #             return data
229    #         else:
230    #             return None
231    #     else:
232    #         return None
233        
234    def check_sum(self, data: bytes) -> bool:
235        """ 校验串口数据 """
236        if self._checksum_type == CheckSumType.NONE:
237            return True
238        checksum_length = checksum_length_map.get(self._checksum_type)
239        if checksum_length is None:
240            raise ValueError("无效的校验和类型,无法进行校验")
241
242        data_to_check = data[len(self._header):-checksum_length]
243        expected_checksum = data[-checksum_length:]
244        calculated_checksum = self._get_check_sum(data_to_check)
245
246        return calculated_checksum == expected_checksum
247
248    def __del__(self):
249        if self._serial:
250            self._serial.close()
class DeviceType(enum.Enum):
15class DeviceType(Enum):
16    """ 设备类型枚举 """
17    STM32F407 = 0
18    """ STM32F407 设备类型 """
19    ARDUINO_MEGA2560 = 1
20    """ Arduino Mega2560 设备类型 """

设备类型枚举

STM32F407 = <DeviceType.STM32F407: 0>

STM32F407 设备类型

ARDUINO_MEGA2560 = <DeviceType.ARDUINO_MEGA2560: 1>

Arduino Mega2560 设备类型

Inherited Members
enum.Enum
name
value
class CheckSumType(enum.Enum):
22class CheckSumType(Enum):
23    """ 校验和类型枚举 """
24    NONE = 0
25    """ 无校验和 """
26    SUM8 = 1
27    """ SUM8 校验和 """
28    SUM16 = 2
29    """ SUM16 校验和 """
30    XOR8 = 3
31    """ XOR8 校验和 """
32    XOR16 = 4
33    """ XOR16 校验和 """
34    CRC8 = 5
35    """ CRC8 校验和 """
36    CRC16 = 6
37    """ CRC16 校验和 """

校验和类型枚举

NONE = <CheckSumType.NONE: 0>

无校验和

SUM8 = <CheckSumType.SUM8: 1>

SUM8 校验和

SUM16 = <CheckSumType.SUM16: 2>

SUM16 校验和

XOR8 = <CheckSumType.XOR8: 3>

XOR8 校验和

XOR16 = <CheckSumType.XOR16: 4>

XOR16 校验和

CRC8 = <CheckSumType.CRC8: 5>

CRC8 校验和

CRC16 = <CheckSumType.CRC16: 6>

CRC16 校验和

Inherited Members
enum.Enum
name
value
checksum_length_map = {<CheckSumType.SUM8: 1>: 1, <CheckSumType.SUM16: 2>: 2, <CheckSumType.XOR8: 3>: 1, <CheckSumType.XOR16: 4>: 2, <CheckSumType.CRC8: 5>: 1, <CheckSumType.CRC16: 6>: 2}

校验和长度映射表

class SerialIO(robotengine.node.Node):
 49class SerialIO(Node):
 50    """ 串口节点 """
 51    def __init__(self, name="SerialIO", device_type=DeviceType.STM32F407, checksum_type=CheckSumType.NONE, header=[], baudrate=115200, timeout=1.0, warn=True):
 52        """ 
 53        初始化串口节点。
 54
 55            :param name: 节点名称。
 56            :param device_type: 设备类型。
 57            :param checksum_type: 校验和类型。
 58            :param header: 数据头。
 59            :param baudrate: 波特率。
 60            :param timeout: 超时时间。
 61        """
 62        super().__init__(name)
 63        self._device_type = device_type
 64        self._checksum_type = checksum_type
 65        self._header = header
 66        self._header_flag = 0
 67        self._device = None
 68        self._serial: serial.Serial = None
 69        self._baudrate = baudrate
 70        self._timeout = timeout
 71
 72        self._warn = warn
 73        self._receive_data = bytes()
 74
 75        self._initialize()
 76        if self._device is None and self._warn:
 77            warning(f"节点 {self.name} 初始化时未检测到 {self._device_type} 设备,将在内部更新中继续尝试")
 78
 79    def _update(self, delta) -> None:
 80        if self._device is None:
 81            self._initialize()
 82            return
 83        
 84    def _initialize(self):
 85        self._device = self._find_device()
 86        if self._device:
 87            info(f"节点 {self.name} 初始化时检测到 {self._device_type} 设备,串口为 {self._device},波特率为 {self._baudrate}")
 88            self._serial = serial.Serial(self._device, self._baudrate, timeout=self._timeout)
 89            # 清空串口缓冲区
 90            self._serial.reset_input_buffer()
 91            self._serial.reset_output_buffer()
 92
 93    def _find_device(self):
 94        if self._device_type == DeviceType.STM32F407:
 95            target_vid = 0x1A86
 96            target_pid = 0x7523
 97        elif self._device_type == DeviceType.ARDUINO_MEGA2560:
 98            target_vid = 0x2341
 99            target_pid = 0x0043
100
101        ports = serial.tools.list_ports.comports()
102        for port in ports:
103            if port.vid == target_vid and port.pid == target_pid:
104                return port.device
105        return None
106    
107    def _get_check_sum(self, data: bytes) -> bytes:
108        if self._checksum_type == CheckSumType.SUM8:
109            check_sum = sum(data) & 0xFF
110            return bytes([check_sum])
111        elif self._checksum_type == CheckSumType.SUM16:
112            check_sum = sum(data) & 0xFFFF
113            return check_sum.to_bytes(2, byteorder='big')
114        elif self._checksum_type == CheckSumType.XOR8:
115            check_sum = 0
116            for byte in data:
117                check_sum ^= byte
118            return bytes([check_sum])
119        elif self._checksum_type == CheckSumType.XOR16:
120            check_sum = 0
121            for byte in data:
122                check_sum ^= byte
123            return check_sum.to_bytes(2, byteorder='big')
124        elif self._checksum_type == CheckSumType.CRC8:
125            crc = 0x00
126            polynomial = 0x07
127            for byte in data:
128                crc ^= byte
129                for _ in range(8):
130                    if crc & 0x80:
131                        crc = (crc << 1) ^ polynomial
132                    else:
133                        crc <<= 1
134                    crc &= 0xFF
135            return bytes([crc])
136        elif self._checksum_type == CheckSumType.CRC16:
137            crc = 0xFFFF
138            polynomial = 0x8005
139            for byte in data:
140                crc ^= byte
141                for _ in range(8):
142                    if crc & 0x0001:
143                        crc = (crc >> 1) ^ polynomial
144                    else:
145                        crc >>= 1  # 否则仅右移
146            return crc.to_bytes(2, byteorder='big')
147        else:
148            raise ValueError("无效的校验和类型")
149            
150    def _add_header(self, data: bytes) -> bytes:
151        return bytes(self._header) + data
152    
153    def random_bytes(self, length: int) -> bytes:
154        """ 生成随机字节 """
155        return bytes([random.randint(0, 255) for _ in range(length)])
156    
157    def fixed_bytes(self, byte: int, length: int) -> bytes:
158        """ 生成固定字节 """
159        return bytes([byte for _ in range(length)])
160    
161    def transmit(self, data: bytes) -> bytes:
162        """ 发送串口数据 """
163        if self._serial is None:
164            if self._warn:
165                warning(f"节点 {self.name} 串口未初始化,无法发送数据")
166            return
167        if self._checksum_type !=CheckSumType.NONE:
168            data += self._get_check_sum(data)
169        if self._header:
170            data = self._add_header(data)
171        self._serial.write(data)
172        return data
173    
174    def receive(self, length: int) -> bytes:
175        """ 
176        接收串口数据 
177        
178            :param len: 接收数据的长度
179        """
180        if self._serial is None:
181            if self._warn:
182                warning(f"节点 {self.name} 串口未初始化,无法接收数据")
183            return
184        
185        in_waiting = self._serial.in_waiting
186        if in_waiting == 0:
187            return None
188        
189        if self._header and self._header_flag < len(self._header):
190            target_data = self._header[self._header_flag]
191            data = self._serial.read(1)
192            if ord(data) == target_data:
193                self._header_flag += 1
194                self._receive_data += data
195            else:
196                self._header_flag = 0
197                self._receive_data = bytes()
198            return None
199        
200        pending_length = length - len(self._receive_data)
201        self._receive_data += self._serial.read(min(in_waiting, pending_length))
202        if len(self._receive_data) >= length:
203            data = self._receive_data[:length]
204            self._receive_data = self._receive_data[length:]
205            self._header_flag = 0
206            return data
207        else:
208            return None
209        
210            
211    # def receive(self, length: int) -> bytes:
212    #     """ 
213    #     接收串口数据 
214        
215    #         :param len: 接收数据的长度
216    #     """
217    #     if self._serial is None:
218    #         if self._warn:
219    #             warning(f"节点 {self.name} 串口未初始化,无法接收数据")
220    #         return
221        
222    #     in_waiting = self._serial.in_waiting
223    #     pending_length = length - len(self._receive_data)
224    #     if in_waiting >= 0:
225    #         self._receive_data += self._serial.read(min(in_waiting, pending_length))
226    #         if len(self._receive_data) >= length:
227    #             data = self._receive_data[:length]
228    #             self._receive_data = self._receive_data[length:]
229    #             return data
230    #         else:
231    #             return None
232    #     else:
233    #         return None
234        
235    def check_sum(self, data: bytes) -> bool:
236        """ 校验串口数据 """
237        if self._checksum_type == CheckSumType.NONE:
238            return True
239        checksum_length = checksum_length_map.get(self._checksum_type)
240        if checksum_length is None:
241            raise ValueError("无效的校验和类型,无法进行校验")
242
243        data_to_check = data[len(self._header):-checksum_length]
244        expected_checksum = data[-checksum_length:]
245        calculated_checksum = self._get_check_sum(data_to_check)
246
247        return calculated_checksum == expected_checksum
248
249    def __del__(self):
250        if self._serial:
251            self._serial.close()

串口节点

SerialIO( name='SerialIO', device_type=<DeviceType.STM32F407: 0>, checksum_type=<CheckSumType.NONE: 0>, header=[], baudrate=115200, timeout=1.0, warn=True)
51    def __init__(self, name="SerialIO", device_type=DeviceType.STM32F407, checksum_type=CheckSumType.NONE, header=[], baudrate=115200, timeout=1.0, warn=True):
52        """ 
53        初始化串口节点。
54
55            :param name: 节点名称。
56            :param device_type: 设备类型。
57            :param checksum_type: 校验和类型。
58            :param header: 数据头。
59            :param baudrate: 波特率。
60            :param timeout: 超时时间。
61        """
62        super().__init__(name)
63        self._device_type = device_type
64        self._checksum_type = checksum_type
65        self._header = header
66        self._header_flag = 0
67        self._device = None
68        self._serial: serial.Serial = None
69        self._baudrate = baudrate
70        self._timeout = timeout
71
72        self._warn = warn
73        self._receive_data = bytes()
74
75        self._initialize()
76        if self._device is None and self._warn:
77            warning(f"节点 {self.name} 初始化时未检测到 {self._device_type} 设备,将在内部更新中继续尝试")

初始化串口节点。

:param name: 节点名称。
:param device_type: 设备类型。
:param checksum_type: 校验和类型。
:param header: 数据头。
:param baudrate: 波特率。
:param timeout: 超时时间。
def random_bytes(self, length: int) -> bytes:
153    def random_bytes(self, length: int) -> bytes:
154        """ 生成随机字节 """
155        return bytes([random.randint(0, 255) for _ in range(length)])

生成随机字节

def fixed_bytes(self, byte: int, length: int) -> bytes:
157    def fixed_bytes(self, byte: int, length: int) -> bytes:
158        """ 生成固定字节 """
159        return bytes([byte for _ in range(length)])

生成固定字节

def transmit(self, data: bytes) -> bytes:
161    def transmit(self, data: bytes) -> bytes:
162        """ 发送串口数据 """
163        if self._serial is None:
164            if self._warn:
165                warning(f"节点 {self.name} 串口未初始化,无法发送数据")
166            return
167        if self._checksum_type !=CheckSumType.NONE:
168            data += self._get_check_sum(data)
169        if self._header:
170            data = self._add_header(data)
171        self._serial.write(data)
172        return data

发送串口数据

def receive(self, length: int) -> bytes:
174    def receive(self, length: int) -> bytes:
175        """ 
176        接收串口数据 
177        
178            :param len: 接收数据的长度
179        """
180        if self._serial is None:
181            if self._warn:
182                warning(f"节点 {self.name} 串口未初始化,无法接收数据")
183            return
184        
185        in_waiting = self._serial.in_waiting
186        if in_waiting == 0:
187            return None
188        
189        if self._header and self._header_flag < len(self._header):
190            target_data = self._header[self._header_flag]
191            data = self._serial.read(1)
192            if ord(data) == target_data:
193                self._header_flag += 1
194                self._receive_data += data
195            else:
196                self._header_flag = 0
197                self._receive_data = bytes()
198            return None
199        
200        pending_length = length - len(self._receive_data)
201        self._receive_data += self._serial.read(min(in_waiting, pending_length))
202        if len(self._receive_data) >= length:
203            data = self._receive_data[:length]
204            self._receive_data = self._receive_data[length:]
205            self._header_flag = 0
206            return data
207        else:
208            return None

接收串口数据

:param len: 接收数据的长度
def check_sum(self, data: bytes) -> bool:
235    def check_sum(self, data: bytes) -> bool:
236        """ 校验串口数据 """
237        if self._checksum_type == CheckSumType.NONE:
238            return True
239        checksum_length = checksum_length_map.get(self._checksum_type)
240        if checksum_length is None:
241            raise ValueError("无效的校验和类型,无法进行校验")
242
243        data_to_check = data[len(self._header):-checksum_length]
244        expected_checksum = data[-checksum_length:]
245        calculated_checksum = self._get_check_sum(data_to_check)
246
247        return calculated_checksum == expected_checksum

校验串口数据