robotengine.serial_io

serial_io 是 robotengine 控制硬件串口的节点。

  1"""
  2
  3serial_io 是 robotengine 控制硬件串口的节点。
  4
  5"""
  6
  7from .node import Node
  8import serial.tools.list_ports
  9import serial
 10from enum import Enum
 11import random
 12from robotengine.tools import hex2str, warning, error, info
 13
 14class DeviceType(Enum):
 15    """ 设备类型枚举 """
 16    STM32F407 = 0
 17    """ STM32F407 设备类型 """
 18    ARDUINO_MEGA2560 = 1
 19    """ Arduino Mega2560 设备类型 """
 20
 21class CheckSumType(Enum):
 22    """ 校验和类型枚举 """
 23    NONE = 0
 24    """ 无校验和 """
 25    SUM8 = 1
 26    """ SUM8 校验和 """
 27    SUM16 = 2
 28    """ SUM16 校验和 """
 29    XOR8 = 3
 30    """ XOR8 校验和 """
 31    XOR16 = 4
 32    """ XOR16 校验和 """
 33    CRC8 = 5
 34    """ CRC8 校验和 """
 35    CRC16 = 6
 36    """ CRC16 校验和 """
 37
 38checksum_length_map = {
 39        CheckSumType.SUM8: 1,
 40        CheckSumType.SUM16: 2,
 41        CheckSumType.XOR8: 1,
 42        CheckSumType.XOR16: 2,
 43        CheckSumType.CRC8: 1,
 44        CheckSumType.CRC16: 2
 45    }
 46""" 校验和长度映射表 """
 47
 48class SerialIO(Node):
 49    """ 串口节点 """
 50    def __init__(self, name="SerialIO", device_type=DeviceType.STM32F407, checksum_type=CheckSumType.NONE, header=[], baudrate=115200, timeout=1.0, warn=True):
 51        """ 
 52        初始化串口节点。
 53
 54            :param name: 节点名称。
 55            :param device_type: 设备类型。
 56            :param checksum_type: 校验和类型。
 57            :param header: 数据头。
 58            :param baudrate: 波特率。
 59            :param timeout: 超时时间。
 60        """
 61        super().__init__(name)
 62        self._device_type = device_type
 63        self._checksum_type = checksum_type
 64        self._header = header
 65        self._header_flag = 0
 66        self._device = None
 67        self._serial: serial.Serial = None
 68        self._baudrate = baudrate
 69        self._timeout = timeout
 70
 71        self._warn = warn
 72        self._receive_data = bytes()
 73
 74        self._initialize()
 75        if self._device is None and self._warn:
 76            warning(f"节点 {self.name} 初始化时未检测到 {self._device_type} 设备,将在内部更新中继续尝试")
 77
 78    def _update(self, delta) -> None:
 79        if self._device is None:
 80            self._initialize()
 81            return
 82        
 83    def _initialize(self):
 84        self._device = self._find_device()
 85        if self._device:
 86            try:
 87                info(f"节点 {self.name} 初始化时检测到 {self._device_type} 设备,串口为 {self._device},波特率为 {self._baudrate}")
 88                self._serial = serial.Serial(self._device, self._baudrate, timeout=self._timeout)
 89                # 清空串口缓冲区
 90                self._serial.reset_input_buffer()
 91                self._serial.reset_output_buffer()
 92            except serial.SerialException as e:
 93                error(f"节点 {self.name} 尝试打开串口 {self._device} 失败,错误信息为 {e}")
 94                self._device = None
 95                self._serial = None
 96
 97    def _find_device(self):
 98        if self._device_type == DeviceType.STM32F407:
 99            target_vid = 0x1A86
100            target_pid = 0x7523
101        elif self._device_type == DeviceType.ARDUINO_MEGA2560:
102            target_vid = 0x2341
103            target_pid = 0x0043
104
105        ports = serial.tools.list_ports.comports()
106        for port in ports:
107            if port.vid == target_vid and port.pid == target_pid:
108                return port.device
109        return None
110    
111    def _get_check_sum(self, data: bytes) -> bytes:
112        if self._checksum_type == CheckSumType.SUM8:
113            check_sum = sum(data) & 0xFF
114            return bytes([check_sum])
115        elif self._checksum_type == CheckSumType.SUM16:
116            check_sum = sum(data) & 0xFFFF
117            return check_sum.to_bytes(2, byteorder='big')
118        elif self._checksum_type == CheckSumType.XOR8:
119            check_sum = 0
120            for byte in data:
121                check_sum ^= byte
122            return bytes([check_sum])
123        elif self._checksum_type == CheckSumType.XOR16:
124            check_sum = 0
125            for byte in data:
126                check_sum ^= byte
127            return check_sum.to_bytes(2, byteorder='big')
128        elif self._checksum_type == CheckSumType.CRC8:
129            crc = 0x00
130            polynomial = 0x07
131            for byte in data:
132                crc ^= byte
133                for _ in range(8):
134                    if crc & 0x80:
135                        crc = (crc << 1) ^ polynomial
136                    else:
137                        crc <<= 1
138                    crc &= 0xFF
139            return bytes([crc])
140        elif self._checksum_type == CheckSumType.CRC16:
141            crc = 0xFFFF
142            polynomial = 0x8005
143            for byte in data:
144                crc ^= byte
145                for _ in range(8):
146                    if crc & 0x0001:
147                        crc = (crc >> 1) ^ polynomial
148                    else:
149                        crc >>= 1  # 否则仅右移
150            return crc.to_bytes(2, byteorder='big')
151        else:
152            raise ValueError("无效的校验和类型")
153            
154    def _add_header(self, data: bytes) -> bytes:
155        return bytes(self._header) + data
156    
157    def random_bytes(self, length: int) -> bytes:
158        """ 生成随机字节 """
159        return bytes([random.randint(0, 255) for _ in range(length)])
160    
161    def fixed_bytes(self, byte: int, length: int) -> bytes:
162        """ 生成固定字节 """
163        return bytes([byte for _ in range(length)])
164    
165    def transmit(self, data: bytes) -> bytes:
166        """ 发送串口数据 """
167        if self._serial is None:
168            if self._warn:
169                warning(f"节点 {self.name} 串口未初始化,无法发送数据")
170            return
171        if self._checksum_type !=CheckSumType.NONE:
172            data += self._get_check_sum(data)
173        if self._header:
174            data = self._add_header(data)
175        self._serial.write(data)
176        return data
177    
178    def receive(self, length: int) -> bytes:
179        """ 
180        接收串口数据 
181        
182            :param len: 接收数据的长度
183        """
184        if self._serial is None:
185            if self._warn:
186                warning(f"节点 {self.name} 串口未初始化,无法接收数据")
187            return
188        
189        in_waiting = self._serial.in_waiting
190        if in_waiting == 0:
191            return None
192        
193        if self._header and self._header_flag < len(self._header):
194            target_data = self._header[self._header_flag]
195            data = self._serial.read(1)
196            if ord(data) == target_data:
197                self._header_flag += 1
198                self._receive_data += data
199            else:
200                self._header_flag = 0
201                self._receive_data = bytes()
202            return None
203        
204        pending_length = length - len(self._receive_data)
205        self._receive_data += self._serial.read(min(in_waiting, pending_length))
206        if len(self._receive_data) >= length:
207            data = self._receive_data[:length]
208            self._receive_data = self._receive_data[length:]
209            self._header_flag = 0
210            return data
211        else:
212            return None
213        
214            
215    # def receive(self, length: int) -> bytes:
216    #     """ 
217    #     接收串口数据 
218        
219    #         :param len: 接收数据的长度
220    #     """
221    #     if self._serial is None:
222    #         if self._warn:
223    #             warning(f"节点 {self.name} 串口未初始化,无法接收数据")
224    #         return
225        
226    #     in_waiting = self._serial.in_waiting
227    #     pending_length = length - len(self._receive_data)
228    #     if in_waiting >= 0:
229    #         self._receive_data += self._serial.read(min(in_waiting, pending_length))
230    #         if len(self._receive_data) >= length:
231    #             data = self._receive_data[:length]
232    #             self._receive_data = self._receive_data[length:]
233    #             return data
234    #         else:
235    #             return None
236    #     else:
237    #         return None
238        
239    def check_sum(self, data: bytes) -> bool:
240        """ 校验串口数据 """
241        if self._checksum_type == CheckSumType.NONE:
242            return True
243        checksum_length = checksum_length_map.get(self._checksum_type)
244        if checksum_length is None:
245            raise ValueError("无效的校验和类型,无法进行校验")
246
247        data_to_check = data[len(self._header):-checksum_length]
248        expected_checksum = data[-checksum_length:]
249        calculated_checksum = self._get_check_sum(data_to_check)
250
251        return calculated_checksum == expected_checksum
252
253    def __del__(self):
254        if self._serial:
255            self._serial.close()
class DeviceType(enum.Enum):
15class DeviceType(Enum):
16    """ 设备类型枚举 """
17    STM32F407 = 0
18    """ STM32F407 设备类型 """
19    ARDUINO_MEGA2560 = 1
20    """ Arduino Mega2560 设备类型 """

设备类型枚举

STM32F407 = <DeviceType.STM32F407: 0>

STM32F407 设备类型

ARDUINO_MEGA2560 = <DeviceType.ARDUINO_MEGA2560: 1>

Arduino Mega2560 设备类型

Inherited Members
enum.Enum
name
value
class CheckSumType(enum.Enum):
22class CheckSumType(Enum):
23    """ 校验和类型枚举 """
24    NONE = 0
25    """ 无校验和 """
26    SUM8 = 1
27    """ SUM8 校验和 """
28    SUM16 = 2
29    """ SUM16 校验和 """
30    XOR8 = 3
31    """ XOR8 校验和 """
32    XOR16 = 4
33    """ XOR16 校验和 """
34    CRC8 = 5
35    """ CRC8 校验和 """
36    CRC16 = 6
37    """ CRC16 校验和 """

校验和类型枚举

NONE = <CheckSumType.NONE: 0>

无校验和

SUM8 = <CheckSumType.SUM8: 1>

SUM8 校验和

SUM16 = <CheckSumType.SUM16: 2>

SUM16 校验和

XOR8 = <CheckSumType.XOR8: 3>

XOR8 校验和

XOR16 = <CheckSumType.XOR16: 4>

XOR16 校验和

CRC8 = <CheckSumType.CRC8: 5>

CRC8 校验和

CRC16 = <CheckSumType.CRC16: 6>

CRC16 校验和

Inherited Members
enum.Enum
name
value
checksum_length_map = {<CheckSumType.SUM8: 1>: 1, <CheckSumType.SUM16: 2>: 2, <CheckSumType.XOR8: 3>: 1, <CheckSumType.XOR16: 4>: 2, <CheckSumType.CRC8: 5>: 1, <CheckSumType.CRC16: 6>: 2}

校验和长度映射表

class SerialIO(robotengine.node.Node):
 49class SerialIO(Node):
 50    """ 串口节点 """
 51    def __init__(self, name="SerialIO", device_type=DeviceType.STM32F407, checksum_type=CheckSumType.NONE, header=[], baudrate=115200, timeout=1.0, warn=True):
 52        """ 
 53        初始化串口节点。
 54
 55            :param name: 节点名称。
 56            :param device_type: 设备类型。
 57            :param checksum_type: 校验和类型。
 58            :param header: 数据头。
 59            :param baudrate: 波特率。
 60            :param timeout: 超时时间。
 61        """
 62        super().__init__(name)
 63        self._device_type = device_type
 64        self._checksum_type = checksum_type
 65        self._header = header
 66        self._header_flag = 0
 67        self._device = None
 68        self._serial: serial.Serial = None
 69        self._baudrate = baudrate
 70        self._timeout = timeout
 71
 72        self._warn = warn
 73        self._receive_data = bytes()
 74
 75        self._initialize()
 76        if self._device is None and self._warn:
 77            warning(f"节点 {self.name} 初始化时未检测到 {self._device_type} 设备,将在内部更新中继续尝试")
 78
 79    def _update(self, delta) -> None:
 80        if self._device is None:
 81            self._initialize()
 82            return
 83        
 84    def _initialize(self):
 85        self._device = self._find_device()
 86        if self._device:
 87            try:
 88                info(f"节点 {self.name} 初始化时检测到 {self._device_type} 设备,串口为 {self._device},波特率为 {self._baudrate}")
 89                self._serial = serial.Serial(self._device, self._baudrate, timeout=self._timeout)
 90                # 清空串口缓冲区
 91                self._serial.reset_input_buffer()
 92                self._serial.reset_output_buffer()
 93            except serial.SerialException as e:
 94                error(f"节点 {self.name} 尝试打开串口 {self._device} 失败,错误信息为 {e}")
 95                self._device = None
 96                self._serial = None
 97
 98    def _find_device(self):
 99        if self._device_type == DeviceType.STM32F407:
100            target_vid = 0x1A86
101            target_pid = 0x7523
102        elif self._device_type == DeviceType.ARDUINO_MEGA2560:
103            target_vid = 0x2341
104            target_pid = 0x0043
105
106        ports = serial.tools.list_ports.comports()
107        for port in ports:
108            if port.vid == target_vid and port.pid == target_pid:
109                return port.device
110        return None
111    
112    def _get_check_sum(self, data: bytes) -> bytes:
113        if self._checksum_type == CheckSumType.SUM8:
114            check_sum = sum(data) & 0xFF
115            return bytes([check_sum])
116        elif self._checksum_type == CheckSumType.SUM16:
117            check_sum = sum(data) & 0xFFFF
118            return check_sum.to_bytes(2, byteorder='big')
119        elif self._checksum_type == CheckSumType.XOR8:
120            check_sum = 0
121            for byte in data:
122                check_sum ^= byte
123            return bytes([check_sum])
124        elif self._checksum_type == CheckSumType.XOR16:
125            check_sum = 0
126            for byte in data:
127                check_sum ^= byte
128            return check_sum.to_bytes(2, byteorder='big')
129        elif self._checksum_type == CheckSumType.CRC8:
130            crc = 0x00
131            polynomial = 0x07
132            for byte in data:
133                crc ^= byte
134                for _ in range(8):
135                    if crc & 0x80:
136                        crc = (crc << 1) ^ polynomial
137                    else:
138                        crc <<= 1
139                    crc &= 0xFF
140            return bytes([crc])
141        elif self._checksum_type == CheckSumType.CRC16:
142            crc = 0xFFFF
143            polynomial = 0x8005
144            for byte in data:
145                crc ^= byte
146                for _ in range(8):
147                    if crc & 0x0001:
148                        crc = (crc >> 1) ^ polynomial
149                    else:
150                        crc >>= 1  # 否则仅右移
151            return crc.to_bytes(2, byteorder='big')
152        else:
153            raise ValueError("无效的校验和类型")
154            
155    def _add_header(self, data: bytes) -> bytes:
156        return bytes(self._header) + data
157    
158    def random_bytes(self, length: int) -> bytes:
159        """ 生成随机字节 """
160        return bytes([random.randint(0, 255) for _ in range(length)])
161    
162    def fixed_bytes(self, byte: int, length: int) -> bytes:
163        """ 生成固定字节 """
164        return bytes([byte for _ in range(length)])
165    
166    def transmit(self, data: bytes) -> bytes:
167        """ 发送串口数据 """
168        if self._serial is None:
169            if self._warn:
170                warning(f"节点 {self.name} 串口未初始化,无法发送数据")
171            return
172        if self._checksum_type !=CheckSumType.NONE:
173            data += self._get_check_sum(data)
174        if self._header:
175            data = self._add_header(data)
176        self._serial.write(data)
177        return data
178    
179    def receive(self, length: int) -> bytes:
180        """ 
181        接收串口数据 
182        
183            :param len: 接收数据的长度
184        """
185        if self._serial is None:
186            if self._warn:
187                warning(f"节点 {self.name} 串口未初始化,无法接收数据")
188            return
189        
190        in_waiting = self._serial.in_waiting
191        if in_waiting == 0:
192            return None
193        
194        if self._header and self._header_flag < len(self._header):
195            target_data = self._header[self._header_flag]
196            data = self._serial.read(1)
197            if ord(data) == target_data:
198                self._header_flag += 1
199                self._receive_data += data
200            else:
201                self._header_flag = 0
202                self._receive_data = bytes()
203            return None
204        
205        pending_length = length - len(self._receive_data)
206        self._receive_data += self._serial.read(min(in_waiting, pending_length))
207        if len(self._receive_data) >= length:
208            data = self._receive_data[:length]
209            self._receive_data = self._receive_data[length:]
210            self._header_flag = 0
211            return data
212        else:
213            return None
214        
215            
216    # def receive(self, length: int) -> bytes:
217    #     """ 
218    #     接收串口数据 
219        
220    #         :param len: 接收数据的长度
221    #     """
222    #     if self._serial is None:
223    #         if self._warn:
224    #             warning(f"节点 {self.name} 串口未初始化,无法接收数据")
225    #         return
226        
227    #     in_waiting = self._serial.in_waiting
228    #     pending_length = length - len(self._receive_data)
229    #     if in_waiting >= 0:
230    #         self._receive_data += self._serial.read(min(in_waiting, pending_length))
231    #         if len(self._receive_data) >= length:
232    #             data = self._receive_data[:length]
233    #             self._receive_data = self._receive_data[length:]
234    #             return data
235    #         else:
236    #             return None
237    #     else:
238    #         return None
239        
240    def check_sum(self, data: bytes) -> bool:
241        """ 校验串口数据 """
242        if self._checksum_type == CheckSumType.NONE:
243            return True
244        checksum_length = checksum_length_map.get(self._checksum_type)
245        if checksum_length is None:
246            raise ValueError("无效的校验和类型,无法进行校验")
247
248        data_to_check = data[len(self._header):-checksum_length]
249        expected_checksum = data[-checksum_length:]
250        calculated_checksum = self._get_check_sum(data_to_check)
251
252        return calculated_checksum == expected_checksum
253
254    def __del__(self):
255        if self._serial:
256            self._serial.close()

串口节点

SerialIO( name='SerialIO', device_type=<DeviceType.STM32F407: 0>, checksum_type=<CheckSumType.NONE: 0>, header=[], baudrate=115200, timeout=1.0, warn=True)
51    def __init__(self, name="SerialIO", device_type=DeviceType.STM32F407, checksum_type=CheckSumType.NONE, header=[], baudrate=115200, timeout=1.0, warn=True):
52        """ 
53        初始化串口节点。
54
55            :param name: 节点名称。
56            :param device_type: 设备类型。
57            :param checksum_type: 校验和类型。
58            :param header: 数据头。
59            :param baudrate: 波特率。
60            :param timeout: 超时时间。
61        """
62        super().__init__(name)
63        self._device_type = device_type
64        self._checksum_type = checksum_type
65        self._header = header
66        self._header_flag = 0
67        self._device = None
68        self._serial: serial.Serial = None
69        self._baudrate = baudrate
70        self._timeout = timeout
71
72        self._warn = warn
73        self._receive_data = bytes()
74
75        self._initialize()
76        if self._device is None and self._warn:
77            warning(f"节点 {self.name} 初始化时未检测到 {self._device_type} 设备,将在内部更新中继续尝试")

初始化串口节点。

:param name: 节点名称。
:param device_type: 设备类型。
:param checksum_type: 校验和类型。
:param header: 数据头。
:param baudrate: 波特率。
:param timeout: 超时时间。
def random_bytes(self, length: int) -> bytes:
158    def random_bytes(self, length: int) -> bytes:
159        """ 生成随机字节 """
160        return bytes([random.randint(0, 255) for _ in range(length)])

生成随机字节

def fixed_bytes(self, byte: int, length: int) -> bytes:
162    def fixed_bytes(self, byte: int, length: int) -> bytes:
163        """ 生成固定字节 """
164        return bytes([byte for _ in range(length)])

生成固定字节

def transmit(self, data: bytes) -> bytes:
166    def transmit(self, data: bytes) -> bytes:
167        """ 发送串口数据 """
168        if self._serial is None:
169            if self._warn:
170                warning(f"节点 {self.name} 串口未初始化,无法发送数据")
171            return
172        if self._checksum_type !=CheckSumType.NONE:
173            data += self._get_check_sum(data)
174        if self._header:
175            data = self._add_header(data)
176        self._serial.write(data)
177        return data

发送串口数据

def receive(self, length: int) -> bytes:
179    def receive(self, length: int) -> bytes:
180        """ 
181        接收串口数据 
182        
183            :param len: 接收数据的长度
184        """
185        if self._serial is None:
186            if self._warn:
187                warning(f"节点 {self.name} 串口未初始化,无法接收数据")
188            return
189        
190        in_waiting = self._serial.in_waiting
191        if in_waiting == 0:
192            return None
193        
194        if self._header and self._header_flag < len(self._header):
195            target_data = self._header[self._header_flag]
196            data = self._serial.read(1)
197            if ord(data) == target_data:
198                self._header_flag += 1
199                self._receive_data += data
200            else:
201                self._header_flag = 0
202                self._receive_data = bytes()
203            return None
204        
205        pending_length = length - len(self._receive_data)
206        self._receive_data += self._serial.read(min(in_waiting, pending_length))
207        if len(self._receive_data) >= length:
208            data = self._receive_data[:length]
209            self._receive_data = self._receive_data[length:]
210            self._header_flag = 0
211            return data
212        else:
213            return None

接收串口数据

:param len: 接收数据的长度
def check_sum(self, data: bytes) -> bool:
240    def check_sum(self, data: bytes) -> bool:
241        """ 校验串口数据 """
242        if self._checksum_type == CheckSumType.NONE:
243            return True
244        checksum_length = checksum_length_map.get(self._checksum_type)
245        if checksum_length is None:
246            raise ValueError("无效的校验和类型,无法进行校验")
247
248        data_to_check = data[len(self._header):-checksum_length]
249        expected_checksum = data[-checksum_length:]
250        calculated_checksum = self._get_check_sum(data_to_check)
251
252        return calculated_checksum == expected_checksum

校验串口数据