robotengine.robot_link
robot_link 是 robotengine 控制 ho 机器人的节点。
robot_link 与 机器人之间的通讯是自动的,在连接好设备并确定串口是正常开启后,会自动与机器人进行通讯并更新。
如果配置了 url robot_link 节点会自动发送机器人的状态 HoRobotState 到 url 指定的地址。
robot_link 会不断被动地接收机器人的状态并更新,但是不会主动向机器人发送数据。
使用 robot_link.update() 函数可以向机器人发送数据。
挂载 robot_link 节点后, _process() 的处理速度会显著受到影响,请酌情调整 engine 的运行频率。
1""" 2 3robot_link 是 robotengine 控制 ho 机器人的节点。 4 5robot_link 与 机器人之间的通讯是自动的,在连接好设备并确定串口是正常开启后,会自动与机器人进行通讯并更新。 6 7如果配置了 url robot_link 节点会自动发送机器人的状态 HoRobotState 到 url 指定的地址。 8 9robot_link 会不断被动地接收机器人的状态并更新,但是不会主动向机器人发送数据。 10 11使用 robot_link.update() 函数可以向机器人发送数据。 12 13挂载 robot_link 节点后, _process() 的处理速度会显著受到影响,请酌情调整 engine 的运行频率。 14 15""" 16 17from robotengine.node import Node 18from robotengine.serial_io import SerialIO, DeviceType, CheckSumType 19from robotengine.tools import hex2str, warning, error 20from robotengine.signal import Signal 21from robotengine.timer import Timer 22from typing import List, Tuple 23from enum import Enum 24import requests 25import aiohttp 26import asyncio 27import threading 28import time 29import random 30 31class HoMode(Enum): 32 """ Ho 电机模态 """ 33 S = 0 34 """ 停止 """ 35 I = 1 36 """ 电流控制 """ 37 V = 2 38 """ 速度控制 """ 39 P = 3 40 """ 位置控制 """ 41 42class AlignState: 43 """ 帧和时间戳对齐的状态数据 """ 44 def __init__(self, id: int, i: float, v: float, p: float, frame: int, timestamp: float): 45 """ 初始化对齐状态数据 """ 46 self.id = id 47 """ 电机 id """ 48 self.i: float = i 49 """ 电流 """ 50 self.v: float = v 51 """ 速度 """ 52 self.p: float = p 53 """ 位置 """ 54 self.frame = frame 55 """ 此状态数据对应的帧 """ 56 self.timestamp = timestamp 57 """ 此状态数据对应的时间戳 """ 58 59 def to_dict(self): 60 """ 转换为字典 """ 61 return { 62 "id": self.id, 63 "i": self.i, 64 "v": self.v, 65 "p": self.p, 66 "frame": self.frame, 67 "timestamp": self.timestamp 68 } 69 70 def __repr__(self): 71 return f"AlignState(id={self.id}, i={round(self.i, 2)}, v={round(self.v, 2)}, p={round(self.p, 2)}, frame={self.frame}, timestamp={round(self.timestamp, 2)})" 72 73class HoRobotState: 74 """ Ho 机器人状态 """ 75 def __init__(self, states: List[AlignState]) -> None: 76 """ 初始化 Ho 机器人状态 """ 77 self._states = states 78 79 def get_state(self, id: int) -> AlignState: 80 """ 获取指定 id 的状态 """ 81 for state in self._states: 82 if state.id == id: 83 return state 84 return None 85 86 def to_dict(self): 87 """ 转换为字典 """ 88 return { 89 "states": [state.to_dict() for state in self._states] 90 } 91 92 def __repr__(self): 93 state_str = "" 94 for state in self._states: 95 state_str += str(state) 96 if state != self._states[-1]: 97 state_str += "\n" 98 return f"HoRobotState(\n{state_str})" 99 100class HoRobotLink(Node): 101 """ Ho 机器人链接节点 """ 102 def __init__(self, name="HoRobotLink", buffer_capacity: int=1024, url=None, warn=True) -> None: 103 """ 初始化 Ho 机器人链接节点 """ 104 super().__init__(name) 105 self._data_length = 84 106 self._receive_data = None 107 self._url = url 108 self._warn = warn 109 self._shutdown = threading.Event() 110 if self._url: 111 self._pending_requests = [] 112 self._http_thread = threading.Thread(target=self._http_request) 113 self._http_thread.start() 114 115 self.buffer_capacity: int = buffer_capacity 116 """ 存储状态数据的缓冲区的容量 """ 117 self.state_buffer: List[HoRobotState] = [] 118 """ 存储状态数据的缓冲区 """ 119 120 self.sio: SerialIO = SerialIO(name="SerialIO", device_type=DeviceType.STM32F407, checksum_type=CheckSumType.SUM16, header=[0x0D, 0x0A], warn=True, baudrate=1000000, timeout=1.0) 121 """ 串口节点 RobotLink 会主动挂载一个已经配置好的串口节点 """ 122 self.add_child(self.sio) 123 124 self.receive: Signal = Signal(bytes) 125 """ 信号,当接收到数据时触发(无论是否通过校验和) """ 126 self.robot_state_update: Signal = Signal(HoRobotState) 127 """ 信号,当接收到数据并成功通过校验和,将状态数据更新到信号参数中时触发 """ 128 129 def _ready(self) -> None: 130 pass 131 132 def _add_pending_request(self, robot_state: HoRobotState): 133 self._pending_requests.append(robot_state) 134 if len(self._pending_requests) > 32: 135 self._pending_requests.pop(0) 136 137 def _http_request(self): 138 async def send_request(robot_state: HoRobotState) -> None: 139 try: 140 async with aiohttp.ClientSession() as session: 141 async with session.post(self._url, json=robot_state.to_dict()) as response: 142 pass 143 # print(await response.json()) 144 except aiohttp.ClientError as e: 145 if self._warn: 146 warning(f"{self.name} 向 {self._url} 发送请求时发生错误: {e}") 147 except asyncio.TimeoutError: 148 if self._warn: 149 warning(f"{self.name} 向 {self._url} 发送请求时发生超时错误") 150 except Exception as e: 151 if self._warn: 152 warning(f"{self.name} 向 {self._url} 发送请求时发生未知错误: {e}") 153 154 while not self._shutdown.is_set(): 155 if self._pending_requests: 156 robot_state = self._pending_requests.pop(0) 157 asyncio.run(send_request(robot_state)) 158 time.sleep(0.01) 159 160 def update(self, id: int, mode: HoMode, i: float, v: float, p: float) -> None: 161 """ 向机器人发送数据 """ 162 data = bytes([id]) + bytes([mode.value]) + self._encode(p, 100.0, 4) + \ 163 self._encode(v, 100.0, 4) + self._encode(i, 100.0, 2) 164 self.sio.transmit(data) 165 166 def _process(self, delta) -> None: 167 self._receive_data = self.sio.receive(self._data_length) 168 if self._receive_data: 169 if self.sio.check_sum(self._receive_data): 170 states = [] 171 receive_data = self._receive_data[2:-2] 172 173 id = 1 174 for i in range(0, 80, 10): 175 _data = receive_data[i:i+10] 176 _p = self._decode(_data[0:4], 100.0, 4) 177 _v = self._decode(_data[4:8], 100.0, 4) 178 _i = self._decode(_data[8:10], 100.0, 2) 179 180 align_state = AlignState(id=id, i=_i, v=_v, p=_p, frame=self.engine.get_frame(), timestamp=self.engine.get_timestamp()) 181 states.append(align_state) 182 id += 1 183 184 robot_state = HoRobotState(states) 185 self.state_buffer.append(robot_state) 186 187 if len(self.state_buffer) > self.buffer_capacity: 188 self.state_buffer.pop(0) 189 190 self.robot_state_update.emit(robot_state) 191 if self._url: 192 self._add_pending_request(robot_state) 193 else: 194 if self._warn: 195 warning(f"{self.name} 接收数据 {hex2str(self._receive_data)} 校验和错误") 196 self.receive.emit(self._receive_data) 197 198 def _encode(self, value: float, scale_factor: float, byte_length: int) -> bytes: 199 max_value = (1 << (8 * byte_length - 1)) 200 max_scaled_value = max_value / scale_factor 201 202 if abs(value) >= max_scaled_value: 203 raise ValueError(f"要编码的值 {round(value, 2)} 超出范围 [-{max_scaled_value}, {max_scaled_value}]") 204 205 encoded_value = int(value * scale_factor) + max_value 206 207 max_value_for_length = (1 << (8 * byte_length)) - 1 208 if encoded_value > max_value_for_length: 209 raise ValueError(f"编码值 {encoded_value} 超出了 {byte_length} 字节的最大值 {max_value_for_length}") 210 211 byte_data = [] 212 for i in range(byte_length): 213 byte_data.insert(0, encoded_value & 0xFF) 214 encoded_value >>= 8 215 216 return bytes(byte_data) 217 218 def _decode(self, data: bytes, scale_factor: float, byte_length: int) -> float: 219 if len(data) != byte_length: 220 raise ValueError(f"数据长度 {len(data)} 与指定的字节长度 {byte_length} 不匹配") 221 max_value = (1 << (8 * byte_length - 1)) 222 223 decoded_value = 0 224 for i in range(byte_length): 225 decoded_value <<= 8 226 decoded_value |= data[i] 227 228 decoded_value -= max_value 229 230 return decoded_value / scale_factor
class
HoMode(enum.Enum):
32class HoMode(Enum): 33 """ Ho 电机模态 """ 34 S = 0 35 """ 停止 """ 36 I = 1 37 """ 电流控制 """ 38 V = 2 39 """ 速度控制 """ 40 P = 3 41 """ 位置控制 """
Ho 电机模态
Inherited Members
- enum.Enum
- name
- value
class
AlignState:
43class AlignState: 44 """ 帧和时间戳对齐的状态数据 """ 45 def __init__(self, id: int, i: float, v: float, p: float, frame: int, timestamp: float): 46 """ 初始化对齐状态数据 """ 47 self.id = id 48 """ 电机 id """ 49 self.i: float = i 50 """ 电流 """ 51 self.v: float = v 52 """ 速度 """ 53 self.p: float = p 54 """ 位置 """ 55 self.frame = frame 56 """ 此状态数据对应的帧 """ 57 self.timestamp = timestamp 58 """ 此状态数据对应的时间戳 """ 59 60 def to_dict(self): 61 """ 转换为字典 """ 62 return { 63 "id": self.id, 64 "i": self.i, 65 "v": self.v, 66 "p": self.p, 67 "frame": self.frame, 68 "timestamp": self.timestamp 69 } 70 71 def __repr__(self): 72 return f"AlignState(id={self.id}, i={round(self.i, 2)}, v={round(self.v, 2)}, p={round(self.p, 2)}, frame={self.frame}, timestamp={round(self.timestamp, 2)})"
帧和时间戳对齐的状态数据
AlignState(id: int, i: float, v: float, p: float, frame: int, timestamp: float)
45 def __init__(self, id: int, i: float, v: float, p: float, frame: int, timestamp: float): 46 """ 初始化对齐状态数据 """ 47 self.id = id 48 """ 电机 id """ 49 self.i: float = i 50 """ 电流 """ 51 self.v: float = v 52 """ 速度 """ 53 self.p: float = p 54 """ 位置 """ 55 self.frame = frame 56 """ 此状态数据对应的帧 """ 57 self.timestamp = timestamp 58 """ 此状态数据对应的时间戳 """
初始化对齐状态数据
class
HoRobotState:
74class HoRobotState: 75 """ Ho 机器人状态 """ 76 def __init__(self, states: List[AlignState]) -> None: 77 """ 初始化 Ho 机器人状态 """ 78 self._states = states 79 80 def get_state(self, id: int) -> AlignState: 81 """ 获取指定 id 的状态 """ 82 for state in self._states: 83 if state.id == id: 84 return state 85 return None 86 87 def to_dict(self): 88 """ 转换为字典 """ 89 return { 90 "states": [state.to_dict() for state in self._states] 91 } 92 93 def __repr__(self): 94 state_str = "" 95 for state in self._states: 96 state_str += str(state) 97 if state != self._states[-1]: 98 state_str += "\n" 99 return f"HoRobotState(\n{state_str})"
Ho 机器人状态
HoRobotState(states: List[AlignState])
76 def __init__(self, states: List[AlignState]) -> None: 77 """ 初始化 Ho 机器人状态 """ 78 self._states = states
初始化 Ho 机器人状态
101class HoRobotLink(Node): 102 """ Ho 机器人链接节点 """ 103 def __init__(self, name="HoRobotLink", buffer_capacity: int=1024, url=None, warn=True) -> None: 104 """ 初始化 Ho 机器人链接节点 """ 105 super().__init__(name) 106 self._data_length = 84 107 self._receive_data = None 108 self._url = url 109 self._warn = warn 110 self._shutdown = threading.Event() 111 if self._url: 112 self._pending_requests = [] 113 self._http_thread = threading.Thread(target=self._http_request) 114 self._http_thread.start() 115 116 self.buffer_capacity: int = buffer_capacity 117 """ 存储状态数据的缓冲区的容量 """ 118 self.state_buffer: List[HoRobotState] = [] 119 """ 存储状态数据的缓冲区 """ 120 121 self.sio: SerialIO = SerialIO(name="SerialIO", device_type=DeviceType.STM32F407, checksum_type=CheckSumType.SUM16, header=[0x0D, 0x0A], warn=True, baudrate=1000000, timeout=1.0) 122 """ 串口节点 RobotLink 会主动挂载一个已经配置好的串口节点 """ 123 self.add_child(self.sio) 124 125 self.receive: Signal = Signal(bytes) 126 """ 信号,当接收到数据时触发(无论是否通过校验和) """ 127 self.robot_state_update: Signal = Signal(HoRobotState) 128 """ 信号,当接收到数据并成功通过校验和,将状态数据更新到信号参数中时触发 """ 129 130 def _ready(self) -> None: 131 pass 132 133 def _add_pending_request(self, robot_state: HoRobotState): 134 self._pending_requests.append(robot_state) 135 if len(self._pending_requests) > 32: 136 self._pending_requests.pop(0) 137 138 def _http_request(self): 139 async def send_request(robot_state: HoRobotState) -> None: 140 try: 141 async with aiohttp.ClientSession() as session: 142 async with session.post(self._url, json=robot_state.to_dict()) as response: 143 pass 144 # print(await response.json()) 145 except aiohttp.ClientError as e: 146 if self._warn: 147 warning(f"{self.name} 向 {self._url} 发送请求时发生错误: {e}") 148 except asyncio.TimeoutError: 149 if self._warn: 150 warning(f"{self.name} 向 {self._url} 发送请求时发生超时错误") 151 except Exception as e: 152 if self._warn: 153 warning(f"{self.name} 向 {self._url} 发送请求时发生未知错误: {e}") 154 155 while not self._shutdown.is_set(): 156 if self._pending_requests: 157 robot_state = self._pending_requests.pop(0) 158 asyncio.run(send_request(robot_state)) 159 time.sleep(0.01) 160 161 def update(self, id: int, mode: HoMode, i: float, v: float, p: float) -> None: 162 """ 向机器人发送数据 """ 163 data = bytes([id]) + bytes([mode.value]) + self._encode(p, 100.0, 4) + \ 164 self._encode(v, 100.0, 4) + self._encode(i, 100.0, 2) 165 self.sio.transmit(data) 166 167 def _process(self, delta) -> None: 168 self._receive_data = self.sio.receive(self._data_length) 169 if self._receive_data: 170 if self.sio.check_sum(self._receive_data): 171 states = [] 172 receive_data = self._receive_data[2:-2] 173 174 id = 1 175 for i in range(0, 80, 10): 176 _data = receive_data[i:i+10] 177 _p = self._decode(_data[0:4], 100.0, 4) 178 _v = self._decode(_data[4:8], 100.0, 4) 179 _i = self._decode(_data[8:10], 100.0, 2) 180 181 align_state = AlignState(id=id, i=_i, v=_v, p=_p, frame=self.engine.get_frame(), timestamp=self.engine.get_timestamp()) 182 states.append(align_state) 183 id += 1 184 185 robot_state = HoRobotState(states) 186 self.state_buffer.append(robot_state) 187 188 if len(self.state_buffer) > self.buffer_capacity: 189 self.state_buffer.pop(0) 190 191 self.robot_state_update.emit(robot_state) 192 if self._url: 193 self._add_pending_request(robot_state) 194 else: 195 if self._warn: 196 warning(f"{self.name} 接收数据 {hex2str(self._receive_data)} 校验和错误") 197 self.receive.emit(self._receive_data) 198 199 def _encode(self, value: float, scale_factor: float, byte_length: int) -> bytes: 200 max_value = (1 << (8 * byte_length - 1)) 201 max_scaled_value = max_value / scale_factor 202 203 if abs(value) >= max_scaled_value: 204 raise ValueError(f"要编码的值 {round(value, 2)} 超出范围 [-{max_scaled_value}, {max_scaled_value}]") 205 206 encoded_value = int(value * scale_factor) + max_value 207 208 max_value_for_length = (1 << (8 * byte_length)) - 1 209 if encoded_value > max_value_for_length: 210 raise ValueError(f"编码值 {encoded_value} 超出了 {byte_length} 字节的最大值 {max_value_for_length}") 211 212 byte_data = [] 213 for i in range(byte_length): 214 byte_data.insert(0, encoded_value & 0xFF) 215 encoded_value >>= 8 216 217 return bytes(byte_data) 218 219 def _decode(self, data: bytes, scale_factor: float, byte_length: int) -> float: 220 if len(data) != byte_length: 221 raise ValueError(f"数据长度 {len(data)} 与指定的字节长度 {byte_length} 不匹配") 222 max_value = (1 << (8 * byte_length - 1)) 223 224 decoded_value = 0 225 for i in range(byte_length): 226 decoded_value <<= 8 227 decoded_value |= data[i] 228 229 decoded_value -= max_value 230 231 return decoded_value / scale_factor
Ho 机器人链接节点
HoRobotLink(name='HoRobotLink', buffer_capacity: int = 1024, url=None, warn=True)
103 def __init__(self, name="HoRobotLink", buffer_capacity: int=1024, url=None, warn=True) -> None: 104 """ 初始化 Ho 机器人链接节点 """ 105 super().__init__(name) 106 self._data_length = 84 107 self._receive_data = None 108 self._url = url 109 self._warn = warn 110 self._shutdown = threading.Event() 111 if self._url: 112 self._pending_requests = [] 113 self._http_thread = threading.Thread(target=self._http_request) 114 self._http_thread.start() 115 116 self.buffer_capacity: int = buffer_capacity 117 """ 存储状态数据的缓冲区的容量 """ 118 self.state_buffer: List[HoRobotState] = [] 119 """ 存储状态数据的缓冲区 """ 120 121 self.sio: SerialIO = SerialIO(name="SerialIO", device_type=DeviceType.STM32F407, checksum_type=CheckSumType.SUM16, header=[0x0D, 0x0A], warn=True, baudrate=1000000, timeout=1.0) 122 """ 串口节点 RobotLink 会主动挂载一个已经配置好的串口节点 """ 123 self.add_child(self.sio) 124 125 self.receive: Signal = Signal(bytes) 126 """ 信号,当接收到数据时触发(无论是否通过校验和) """ 127 self.robot_state_update: Signal = Signal(HoRobotState) 128 """ 信号,当接收到数据并成功通过校验和,将状态数据更新到信号参数中时触发 """
初始化 Ho 机器人链接节点