robotengine.robotlink

 1from robotengine.node import Node
 2from robotengine.serial_io import SerialIO, DeviceType, CheckSumType
 3import threading
 4import time
 5from robotengine.tools import hex_to_str
 6
 7class RobotLink(Node):
 8    def __init__(self, name="RobotLink"):
 9        super().__init__(name)
10        self.sio = SerialIO(name="SerialIO",
11                           device_type=DeviceType.STM32F407,
12                           checksum_type=CheckSumType.SUM16,
13                           header=[0x0D, 0x0A],
14                           baudrate=115200,
15                           timeout=1.0)
16        
17        self.add_child(self.sio)
18
19        self._data_length = 36
20
21        self._receive_thread = threading.Thread(target=self._receive, daemon=True)
22        self._receive_thread.start()
23
24        self._receive_data = None
25
26    def _receive(self):
27        while True:
28            self._receive_data = self.sio.receive(self._data_length)
29            if self._receive_data:
30                print("RX: ", hex_to_str(self._receive_data))
31            time.sleep(0.01)
32
33    def _process(self, delta) -> None:
34        # data = self.sio.transmit(self.sio.fixed_bytes(0x0A, 32))
35        data = self.sio.transmit(self.sio.random_bytes(32))
36        print("TX: ", hex_to_str(data))