robotengine.robotlink
1from robotengine.node import Node 2from robotengine.serial_io import SerialIO, DeviceType, CheckSumType 3import threading 4import time 5from robotengine.tools import hex_to_str 6 7class RobotLink(Node): 8 def __init__(self, name="RobotLink"): 9 super().__init__(name) 10 self.sio = SerialIO(name="SerialIO", 11 device_type=DeviceType.STM32F407, 12 checksum_type=CheckSumType.SUM16, 13 header=[0x0D, 0x0A], 14 baudrate=115200, 15 timeout=1.0) 16 17 self.add_child(self.sio) 18 19 self._data_length = 36 20 21 self._receive_thread = threading.Thread(target=self._receive, daemon=True) 22 self._receive_thread.start() 23 24 self._receive_data = None 25 26 def _receive(self): 27 while True: 28 self._receive_data = self.sio.receive(self._data_length) 29 if self._receive_data: 30 print("RX: ", hex_to_str(self._receive_data)) 31 time.sleep(0.01) 32 33 def _process(self, delta) -> None: 34 # data = self.sio.transmit(self.sio.fixed_bytes(0x0A, 32)) 35 data = self.sio.transmit(self.sio.random_bytes(32)) 36 print("TX: ", hex_to_str(data))
8class RobotLink(Node): 9 def __init__(self, name="RobotLink"): 10 super().__init__(name) 11 self.sio = SerialIO(name="SerialIO", 12 device_type=DeviceType.STM32F407, 13 checksum_type=CheckSumType.SUM16, 14 header=[0x0D, 0x0A], 15 baudrate=115200, 16 timeout=1.0) 17 18 self.add_child(self.sio) 19 20 self._data_length = 36 21 22 self._receive_thread = threading.Thread(target=self._receive, daemon=True) 23 self._receive_thread.start() 24 25 self._receive_data = None 26 27 def _receive(self): 28 while True: 29 self._receive_data = self.sio.receive(self._data_length) 30 if self._receive_data: 31 print("RX: ", hex_to_str(self._receive_data)) 32 time.sleep(0.01) 33 34 def _process(self, delta) -> None: 35 # data = self.sio.transmit(self.sio.fixed_bytes(0x0A, 32)) 36 data = self.sio.transmit(self.sio.random_bytes(32)) 37 print("TX: ", hex_to_str(data))
Node 基类
RobotLink(name='RobotLink')
9 def __init__(self, name="RobotLink"): 10 super().__init__(name) 11 self.sio = SerialIO(name="SerialIO", 12 device_type=DeviceType.STM32F407, 13 checksum_type=CheckSumType.SUM16, 14 header=[0x0D, 0x0A], 15 baudrate=115200, 16 timeout=1.0) 17 18 self.add_child(self.sio) 19 20 self._data_length = 36 21 22 self._receive_thread = threading.Thread(target=self._receive, daemon=True) 23 self._receive_thread.start() 24 25 self._receive_data = None
初始化节点, 需要指定节点名称