robotengine.robot_link

robot_link 是 robotengine 控制 ho 机器人的节点。

robot_link 与 机器人之间的通讯是自动的,在连接好设备并确定串口是正常开启后,会自动与机器人进行通讯并更新。

如果配置了 url robot_link 节点会自动发送机器人的状态 HoRobotState 到 url 指定的地址。

robot_link 会不断被动地接收机器人的状态并更新,但是不会主动向机器人发送数据。

使用 robot_link.update() 函数可以向机器人发送数据。

挂载 robot_link 节点后, _process() 的处理速度会显著受到影响,请酌情调整 engine 的运行频率。

  1"""
  2
  3robot_link 是 robotengine 控制 ho 机器人的节点。
  4
  5robot_link 与 机器人之间的通讯是自动的,在连接好设备并确定串口是正常开启后,会自动与机器人进行通讯并更新。
  6
  7如果配置了 url robot_link 节点会自动发送机器人的状态 HoRobotState 到 url 指定的地址。
  8
  9robot_link 会不断被动地接收机器人的状态并更新,但是不会主动向机器人发送数据。
 10
 11使用 robot_link.update() 函数可以向机器人发送数据。
 12
 13挂载 robot_link 节点后, _process() 的处理速度会显著受到影响,请酌情调整 engine 的运行频率。
 14
 15"""
 16
 17from robotengine.node import Node
 18from robotengine.serial_io import SerialIO, DeviceType, CheckSumType
 19from robotengine.tools import hex2str, warning, error
 20from robotengine.signal import Signal
 21from robotengine.timer import Timer
 22from typing import List, Tuple
 23from enum import Enum
 24import requests
 25import aiohttp
 26import asyncio
 27import threading
 28import time
 29import random
 30
 31class HoMode(Enum):
 32    """ Ho 电机模态 """
 33    S = 0
 34    """ 停止 """
 35    I = 1
 36    """ 电流控制 """
 37    V = 2
 38    """ 速度控制 """
 39    P = 3
 40    """ 位置控制 """
 41
 42class AlignState:
 43    """ 帧和时间戳对齐的状态数据 """
 44    def __init__(self, id: int, i: float, v: float, p: float, frame: int, timestamp: float):
 45        """ 初始化对齐状态数据 """
 46        self.id = id
 47        """ 电机 id """
 48        self.i: float = i
 49        """ 电流 """
 50        self.v: float = v
 51        """ 速度 """
 52        self.p: float = p
 53        """ 位置 """
 54        self.frame = frame
 55        """ 此状态数据对应的帧 """
 56        self.timestamp = timestamp
 57        """ 此状态数据对应的时间戳 """
 58
 59    def to_dict(self):
 60        """ 转换为字典 """
 61        return {
 62            "id": self.id,
 63            "i": self.i,
 64            "v": self.v,
 65            "p": self.p,
 66            "frame": self.frame,
 67            "timestamp": self.timestamp
 68        }
 69
 70    def __repr__(self):
 71        return f"AlignState(id={self.id}, i={round(self.i, 2)}, v={round(self.v, 2)}, p={round(self.p, 2)}, frame={self.frame}, timestamp={round(self.timestamp, 2)})"
 72    
 73class HoRobotState:
 74    """ Ho 机器人状态 """
 75    def __init__(self, states: List[AlignState]) -> None:
 76        """ 初始化 Ho 机器人状态 """
 77        self._states = states
 78
 79    def get_state(self, id: int) -> AlignState:
 80        """ 获取指定 id 的状态 """
 81        for state in self._states:
 82            if state.id == id:
 83                return state
 84        return None
 85    
 86    def to_dict(self):
 87        """ 转换为字典 """
 88        return {
 89            "states": [state.to_dict() for state in self._states]
 90        }
 91    
 92    def __repr__(self):
 93        state_str = ""
 94        for state in self._states:
 95            state_str += str(state)
 96            if state != self._states[-1]:
 97                state_str += "\n"
 98        return f"HoRobotState(\n{state_str})"
 99
100class HoRobotLink(Node):
101    """ Ho 机器人链接节点 """
102    def __init__(self, name="HoRobotLink", buffer_capacity: int=1024, url=None, warn=True) -> None:
103        """ 初始化 Ho 机器人链接节点 """
104        super().__init__(name)
105        self._data_length = 84
106        self._receive_data = None
107        self._url = url
108        self._warn = warn
109        self._shutdown = threading.Event()
110        if self._url:
111            self._pending_requests = []
112            self._http_thread = threading.Thread(target=self._http_request)
113            self._http_thread.start()
114
115        self.buffer_capacity: int = buffer_capacity
116        """ 存储状态数据的缓冲区的容量 """
117        self.state_buffer: List[HoRobotState] = []
118        """ 存储状态数据的缓冲区 """
119
120        self.sio: SerialIO = SerialIO(name="SerialIO", device_type=DeviceType.STM32F407, checksum_type=CheckSumType.SUM16, header=[0x0D, 0x0A], warn=True, baudrate=1000000, timeout=1.0)
121        """ 串口节点 RobotLink 会主动挂载一个已经配置好的串口节点 """
122        self.add_child(self.sio)
123
124        self.receive: Signal = Signal(bytes)
125        """ 信号,当接收到数据时触发(无论是否通过校验和) """
126        self.robot_state_update: Signal = Signal(HoRobotState)
127        """ 信号,当接收到数据并成功通过校验和,将状态数据更新到信号参数中时触发 """
128
129    def _ready(self) -> None:
130        pass
131
132    def _add_pending_request(self, robot_state: HoRobotState):
133        self._pending_requests.append(robot_state)
134        if len(self._pending_requests) > 32:
135            self._pending_requests.pop(0)
136
137    def _http_request(self):
138        async def send_request(robot_state: HoRobotState) -> None:
139            try:
140                async with aiohttp.ClientSession() as session:
141                    async with session.post(self._url, json=robot_state.to_dict()) as response:
142                        pass
143                        # print(await response.json())
144            except aiohttp.ClientError as e:
145                if self._warn:
146                    warning(f"{self.name}{self._url} 发送请求时发生错误: {e}")
147            except asyncio.TimeoutError:
148                if self._warn:
149                    warning(f"{self.name}{self._url} 发送请求时发生超时错误")
150            except Exception as e:
151                if self._warn:
152                    warning(f"{self.name}{self._url} 发送请求时发生未知错误: {e}")
153
154        while not self._shutdown.is_set():
155            if self._pending_requests:
156                robot_state = self._pending_requests.pop(0)
157                asyncio.run(send_request(robot_state))
158            time.sleep(0.01)
159
160    def update(self, id: int, mode: HoMode, i: float, v: float, p: float) -> None:
161        """ 向机器人发送数据 """
162        data = bytes([id]) + bytes([mode.value]) + self._encode(p, 100.0, 4) + \
163            self._encode(v, 100.0, 4) + self._encode(i, 100.0, 2)
164        self.sio.transmit(data)
165
166    def _process(self, delta) -> None:
167        self._receive_data = self.sio.receive(self._data_length)
168        if self._receive_data:
169            if self.sio.check_sum(self._receive_data):
170                states = []
171                receive_data = self._receive_data[2:-2]
172
173                id = 1
174                for i in range(0, 80, 10):
175                    _data = receive_data[i:i+10]
176                    _p = self._decode(_data[0:4], 100.0, 4)
177                    _v = self._decode(_data[4:8], 100.0, 4)
178                    _i = self._decode(_data[8:10], 100.0, 2)
179
180                    align_state = AlignState(id=id, i=_i, v=_v, p=_p, frame=self.engine.get_frame(), timestamp=self.engine.get_timestamp())
181                    states.append(align_state)
182                    id += 1
183
184                robot_state = HoRobotState(states)
185                self.state_buffer.append(robot_state)
186
187                if len(self.state_buffer) > self.buffer_capacity:
188                    self.state_buffer.pop(0)
189
190                self.robot_state_update.emit(robot_state)
191                if self._url:
192                    self._add_pending_request(robot_state)
193            else:
194                if self._warn:
195                    warning(f"{self.name} 接收数据 {hex2str(self._receive_data)} 校验和错误")
196            self.receive.emit(self._receive_data)
197
198    def _encode(self, value: float, scale_factor: float, byte_length: int) -> bytes:
199        max_value = (1 << (8 * byte_length - 1))
200        max_scaled_value = max_value / scale_factor
201
202        if abs(value) >= max_scaled_value:
203            raise ValueError(f"要编码的值 {round(value, 2)} 超出范围 [-{max_scaled_value}, {max_scaled_value}]")
204
205        encoded_value = int(value * scale_factor) + max_value
206        
207        max_value_for_length = (1 << (8 * byte_length)) - 1
208        if encoded_value > max_value_for_length:
209            raise ValueError(f"编码值 {encoded_value} 超出了 {byte_length} 字节的最大值 {max_value_for_length}")
210
211        byte_data = []
212        for i in range(byte_length):
213            byte_data.insert(0, encoded_value & 0xFF)
214            encoded_value >>= 8
215
216        return bytes(byte_data)
217
218    def _decode(self, data: bytes, scale_factor: float, byte_length: int) -> float:
219        if len(data) != byte_length:
220            raise ValueError(f"数据长度 {len(data)} 与指定的字节长度 {byte_length} 不匹配")
221        max_value = (1 << (8 * byte_length - 1))
222
223        decoded_value = 0
224        for i in range(byte_length):
225            decoded_value <<= 8
226            decoded_value |= data[i]
227        
228        decoded_value -= max_value
229
230        return decoded_value / scale_factor
class HoMode(enum.Enum):
32class HoMode(Enum):
33    """ Ho 电机模态 """
34    S = 0
35    """ 停止 """
36    I = 1
37    """ 电流控制 """
38    V = 2
39    """ 速度控制 """
40    P = 3
41    """ 位置控制 """

Ho 电机模态

S = <HoMode.S: 0>

停止

I = <HoMode.I: 1>

电流控制

V = <HoMode.V: 2>

速度控制

P = <HoMode.P: 3>

位置控制

Inherited Members
enum.Enum
name
value
class AlignState:
43class AlignState:
44    """ 帧和时间戳对齐的状态数据 """
45    def __init__(self, id: int, i: float, v: float, p: float, frame: int, timestamp: float):
46        """ 初始化对齐状态数据 """
47        self.id = id
48        """ 电机 id """
49        self.i: float = i
50        """ 电流 """
51        self.v: float = v
52        """ 速度 """
53        self.p: float = p
54        """ 位置 """
55        self.frame = frame
56        """ 此状态数据对应的帧 """
57        self.timestamp = timestamp
58        """ 此状态数据对应的时间戳 """
59
60    def to_dict(self):
61        """ 转换为字典 """
62        return {
63            "id": self.id,
64            "i": self.i,
65            "v": self.v,
66            "p": self.p,
67            "frame": self.frame,
68            "timestamp": self.timestamp
69        }
70
71    def __repr__(self):
72        return f"AlignState(id={self.id}, i={round(self.i, 2)}, v={round(self.v, 2)}, p={round(self.p, 2)}, frame={self.frame}, timestamp={round(self.timestamp, 2)})"

帧和时间戳对齐的状态数据

AlignState(id: int, i: float, v: float, p: float, frame: int, timestamp: float)
45    def __init__(self, id: int, i: float, v: float, p: float, frame: int, timestamp: float):
46        """ 初始化对齐状态数据 """
47        self.id = id
48        """ 电机 id """
49        self.i: float = i
50        """ 电流 """
51        self.v: float = v
52        """ 速度 """
53        self.p: float = p
54        """ 位置 """
55        self.frame = frame
56        """ 此状态数据对应的帧 """
57        self.timestamp = timestamp
58        """ 此状态数据对应的时间戳 """

初始化对齐状态数据

id

电机 id

i: float

电流

v: float

速度

p: float

位置

frame

此状态数据对应的帧

timestamp

此状态数据对应的时间戳

def to_dict(self):
60    def to_dict(self):
61        """ 转换为字典 """
62        return {
63            "id": self.id,
64            "i": self.i,
65            "v": self.v,
66            "p": self.p,
67            "frame": self.frame,
68            "timestamp": self.timestamp
69        }

转换为字典

class HoRobotState:
74class HoRobotState:
75    """ Ho 机器人状态 """
76    def __init__(self, states: List[AlignState]) -> None:
77        """ 初始化 Ho 机器人状态 """
78        self._states = states
79
80    def get_state(self, id: int) -> AlignState:
81        """ 获取指定 id 的状态 """
82        for state in self._states:
83            if state.id == id:
84                return state
85        return None
86    
87    def to_dict(self):
88        """ 转换为字典 """
89        return {
90            "states": [state.to_dict() for state in self._states]
91        }
92    
93    def __repr__(self):
94        state_str = ""
95        for state in self._states:
96            state_str += str(state)
97            if state != self._states[-1]:
98                state_str += "\n"
99        return f"HoRobotState(\n{state_str})"

Ho 机器人状态

HoRobotState(states: List[AlignState])
76    def __init__(self, states: List[AlignState]) -> None:
77        """ 初始化 Ho 机器人状态 """
78        self._states = states

初始化 Ho 机器人状态

def get_state(self, id: int) -> AlignState:
80    def get_state(self, id: int) -> AlignState:
81        """ 获取指定 id 的状态 """
82        for state in self._states:
83            if state.id == id:
84                return state
85        return None

获取指定 id 的状态

def to_dict(self):
87    def to_dict(self):
88        """ 转换为字典 """
89        return {
90            "states": [state.to_dict() for state in self._states]
91        }

转换为字典