robotengine.ho_robot

ho_robot 是 robotengine 控制 ho 机器人的节点。

ho_robot 与 机器人之间的通讯是自动的,在连接好设备并确定串口是正常开启后,会自动与机器人进行通讯并更新。

如果配置了 url ho_robot 节点会自动发送机器人的状态 HoState 到 url 指定的地址。

ho_robot 会不断被动地接收机器人的状态并更新,但是不会主动向机器人发送数据。

使用 ho_robot.update() 函数可以向机器人发送数据。

挂载 ho_robot 节点后,_process()的处理速度会显著受到影响,请酌情调整 engine 的运行频率。

  1"""
  2
  3ho_robot 是 robotengine 控制 ho 机器人的节点。
  4
  5ho_robot 与 机器人之间的通讯是自动的,在连接好设备并确定串口是正常开启后,会自动与机器人进行通讯并更新。
  6
  7如果配置了 url ho_robot 节点会自动发送机器人的状态 HoState 到 url 指定的地址。
  8
  9ho_robot 会不断被动地接收机器人的状态并更新,但是不会主动向机器人发送数据。
 10
 11使用 ho_robot.update() 函数可以向机器人发送数据。
 12
 13挂载 ho_robot 节点后,_process()的处理速度会显著受到影响,请酌情调整 engine 的运行频率。
 14
 15"""
 16
 17from robotengine.node import Node
 18from robotengine.serial_io import SerialIO, DeviceType, CheckSumType
 19from robotengine.tools import hex2str, warning, error, info
 20from robotengine.signal import Signal
 21from robotengine.timer import Timer
 22from typing import List, Tuple
 23from enum import Enum
 24import requests
 25import threading
 26import time
 27import random
 28import multiprocessing
 29import tkinter as tk
 30from ttkbootstrap import ttk
 31import ttkbootstrap as ttkb
 32from fastapi import FastAPI, Request
 33import uvicorn
 34from urllib.parse import urlparse
 35import copy
 36
 37class HoMode(Enum):
 38    """ Ho 电机模态 """
 39    S = 0
 40    """ 停止 """
 41    I = 1
 42    """ 电流控制 """
 43    V = 2
 44    """ 速度控制 """
 45    P = 3
 46    """ 位置控制 """
 47
 48class AlignState:
 49    """ 帧和时间戳对齐的状态数据 """
 50    def __init__(self, id: int, i: float, v: float, p: float, frame: int, timestamp: float) -> None:
 51        """ 
 52        初始化对齐状态数据
 53
 54            :param id: 电机 id
 55            :param i: 电流
 56            :param v: 速度
 57            :param p: 位置
 58            :param frame: 当前帧
 59            :param timestamp: 当前时间戳 
 60        """
 61        self.id = id
 62        """ 电机 id """
 63        self.i: float = i
 64        """ 电流 """
 65        self.v: float = v
 66        """ 速度 """
 67        self.p: float = p
 68        """ 位置 """
 69        self.frame = frame
 70        """ 此状态数据对应的帧 """
 71        self.timestamp = timestamp
 72        """ 此状态数据对应的时间戳 """
 73
 74    def to_dict(self):
 75        """ 转换为字典 """
 76        return {
 77            "id": self.id,
 78            "i": self.i,
 79            "v": self.v,
 80            "p": self.p,
 81            "frame": self.frame,
 82            "timestamp": self.timestamp
 83        }
 84
 85    def __repr__(self):
 86        return f"AlignState(id={self.id}, i={round(self.i, 2)}, v={round(self.v, 2)}, p={round(self.p, 2)}, frame={self.frame}, timestamp={round(self.timestamp, 2)})"
 87    
 88class HoState:
 89    """ Ho 机器人状态 """
 90    def __init__(self, states: List[AlignState], random_state=False) -> None:
 91        """ 
 92        初始化 Ho 机器人状态
 93
 94            :param states: 帧和时间戳对齐的状态数据列表
 95            :param random_state: 是否随机生成状态数据
 96        """
 97        if not random_state:
 98            self._states = states
 99        else:
100            self._states = []
101            for i in range(1, 9):
102                self._states.append(AlignState(i, random.uniform(-1.0, 1.0), random.uniform(-360.0, 360.0), random.uniform(-1000.0, 1000.0), 0, 0.0))
103
104    def get_state(self, id: int) -> AlignState:
105        """ 
106        获取指定 id 的状态 
107        """
108        for state in self._states:
109            if state.id == id:
110                return state
111        return None
112    
113    def get_states(self) -> List[AlignState]:
114        """ 
115        获取所有状态 
116        """
117        return self._states
118    
119    def to_dict(self):
120        """ 
121        转换为字典 
122        """
123        return {
124            "states": [state.to_dict() for state in self._states]
125        }
126    
127    def __repr__(self):
128        state_str = ""
129        for state in self._states:
130            state_str += str(state)
131            if state != self._states[-1]:
132                state_str += "\n"
133        return f"HoState(\n{state_str})"
134
135class HoLink(Node):
136    """ Ho 机器人链接节点 """
137    def __init__(self, name="HoLink", buffer_capacity: int=1024, url=None, warn=True) -> None:
138        """ 
139        初始化 Ho 机器人链接节点 
140
141            :param name: 节点名称
142            :param buffer_capacity: 存储状态数据的缓冲区的容量
143            :param url: 数据发送的 url
144            :param read_mode: 串口读取模式
145            :param warn: 是否显示警告
146        """
147        super().__init__(name)
148        self._data_length = 84
149        self._receive_data = None
150        self._url = url
151        self._warn = warn
152        
153        if self._url:
154            self._shutdown = multiprocessing.Event()
155            self._pending_capacity = 256
156            self._pending_requests = multiprocessing.Queue()
157            self._http_process = multiprocessing.Process(target=self._http_request, daemon=True, name=self.name+"HttpProcess")
158            self._http_process.start()
159
160        self.buffer_capacity: int = buffer_capacity
161        """ 存储状态数据的缓冲区的容量 """
162        self.state_buffer: List[HoState] = []
163        """ 存储状态数据的缓冲区 """
164
165        self.sio: SerialIO = SerialIO(name="HoSerialIO", device_type=DeviceType.STM32F407, checksum_type=CheckSumType.SUM16, header=[0x0D, 0x0A], warn=warn, baudrate=1000000, timeout=1.0)
166        """ 串口节点 HoLink 会主动挂载一个已经配置好的串口节点 """
167        self.add_child(self.sio)
168
169        self.receive: Signal = Signal(bytes)
170        """ 信号,当接收到数据时触发(无论是否通过校验和) """
171        self.robot_state_update: Signal = Signal(HoState)
172        """ 信号,当接收到数据并成功通过校验和,将状态数据更新到信号参数中时触发 """
173
174    def _ready(self) -> None:
175        pass
176
177    def _add_pending_request(self, ho_state: HoState):
178        """ 
179        向请求队列中添加请求 
180        """
181        self._pending_requests.put(ho_state)
182        if self._pending_requests.qsize() > self._pending_capacity:
183            if self._warn:
184                warning(f"{self.name}{self._url} 发送请求时,请求队列已满,将丢弃最早的请求,可能会导致数据丢失")
185            self._pending_requests.get()
186
187    def _send_request(self, ho_state_dict: dict) -> None:
188        start_time = time.perf_counter()
189        try:
190            response = requests.post(self._url, json=ho_state_dict, timeout=0.1)
191
192            end_time = time.perf_counter()
193            latency = end_time - start_time
194            # print(f"Request latency: {round(latency * 1000, 2)} ms")
195
196        except requests.RequestException as e:
197            if self._warn:
198                warning(f"请求失败: {e}")
199        except Exception as e:
200            if self._warn:
201                warning(f"发生未知错误: {e}")
202
203    def _http_request(self):
204        info(f"{self.name} 已开启向服务地址 {self._url} 发送数据的功能")
205        while not self._shutdown.is_set():
206            if not self._pending_requests.empty():
207                ho_state = self._pending_requests.get()
208                self._send_request(ho_state.to_dict())
209
210    def update(self, id: int, mode: HoMode, i: float, v: float, p: float) -> None:
211        """ 
212        向机器人发送数据 
213        """
214        data = bytes([id]) + bytes([mode.value]) + self._encode(p, 100.0, 4) + \
215            self._encode(v, 100.0, 4) + self._encode(i, 100.0, 2)
216        # print(f"发送数据: {hex2str(data)}")
217        self.sio.transmit(data)
218
219    def _process(self, delta) -> None:
220        self._receive_data = self.sio.receive(self._data_length)
221        if self._receive_data:
222            if self.sio.check_sum(self._receive_data):
223                states = []
224                receive_data = self._receive_data[2:-2]
225
226                id = 1
227                for i in range(0, 80, 10):
228                    _data = receive_data[i:i+10]
229                    _p = self._decode(_data[0:4], 100.0, 4)
230                    _v = self._decode(_data[4:8], 100.0, 4)
231                    _i = self._decode(_data[8:10], 100.0, 2)
232
233                    align_state = AlignState(id=id, i=_i, v=_v, p=_p, frame=self.engine.get_frame(), timestamp=self.engine.get_timestamp())
234                    states.append(align_state)
235                    id += 1
236
237                ho_state = HoState(states)
238                self.state_buffer.append(ho_state)
239
240                if len(self.state_buffer) > self.buffer_capacity:
241                    self.state_buffer.pop(0)
242
243                self.robot_state_update.emit(ho_state)
244                if self._url:
245                    self._add_pending_request(ho_state)
246            else:
247                if self._warn:
248                    warning(f"{self.name} 长度为 {len(self._receive_data)} 的数据 {hex2str(self._receive_data)} 校验和错误")
249            self.receive.emit(self._receive_data)
250
251    def _encode(self, value: float, scale_factor: float, byte_length: int) -> bytes:
252        max_value = (1 << (8 * byte_length - 1))
253        max_scaled_value = max_value / scale_factor
254
255        if abs(value) >= max_scaled_value:
256            raise ValueError(f"要编码的值 {round(value, 2)} 超出范围 [-{max_scaled_value}, {max_scaled_value}]")
257
258        encoded_value = int(value * scale_factor) + max_value
259        
260        max_value_for_length = (1 << (8 * byte_length)) - 1
261        if encoded_value > max_value_for_length:
262            raise ValueError(f"编码值 {encoded_value} 超出了 {byte_length} 字节的最大值 {max_value_for_length}")
263
264        byte_data = []
265        for i in range(byte_length):
266            byte_data.insert(0, encoded_value & 0xFF)
267            encoded_value >>= 8
268
269        return bytes(byte_data)
270
271    def _decode(self, data: bytes, scale_factor: float, byte_length: int) -> float:
272        if len(data) != byte_length:
273            raise ValueError(f"数据长度 {len(data)} 与指定的字节长度 {byte_length} 不匹配")
274        max_value = (1 << (8 * byte_length - 1))
275
276        decoded_value = 0
277        for i in range(byte_length):
278            decoded_value <<= 8
279            decoded_value |= data[i]
280        
281        decoded_value -= max_value
282
283        return decoded_value / scale_factor
284    
285    # def _on_engine_exit(self):
286    #     if self._url:
287    #         self._shutdown.set()
288    #         self._http_process.join()
289            
290
291class HoServer:
292    def __init__(self, url: str, capacity=1024, ui: bool=True, ui_frequency: float=30.0) -> None:
293        """
294        初始化 HoServer 实例。
295
296            :param url: 服务器的 URL。
297            :param capacity: 数据缓冲区的最大容量。
298            :param ui: 是否启用 UI 界面。
299            :param ui_frequency: UI 更新频率(Hz)。
300        """
301        self._url = url
302        parsed_url = urlparse(url)
303        self._host = parsed_url.hostname
304        self._port = parsed_url.port
305        self._path = parsed_url.path
306
307        self._ui = ui
308        self._ui_frequency = ui_frequency
309        self._capacity = capacity
310        self._data_buffer = []
311        """ 
312        数据缓冲区 
313        """
314
315        self._data_queue = multiprocessing.Queue()
316        self._shutdown = multiprocessing.Event()
317
318        # 启动 FastAPI 应用进程
319        self._app_process = multiprocessing.Process(target=self._run_app, args=(self._path, self._host, self._port), daemon=True)
320
321    def _update_data(self):
322        """
323        从数据队列中读取数据并更新缓冲区。
324        """
325        while not self._shutdown.is_set():
326            if not self._data_queue.empty():
327                ho_state = self._data_queue.get()
328                self._data_buffer.append(ho_state)
329                if len(self._data_buffer) > self._capacity:
330                    self._data_buffer.pop(0)
331
332    def has_data(self):
333        """
334        检查缓冲区中是否有数据。
335
336            :return: 如果缓冲区中有数据,则返回 True,否则返回 False。
337        """
338        return len(self._data_buffer) > 0
339
340    def get_data(self) -> HoState:
341        """
342        获取缓冲区中最新的数据。
343
344            :return: 缓冲区中最新的数据,如果缓冲区为空,则返回 None。
345        """
346        if not self.has_data():
347            return None
348        return self._data_buffer.pop(-1)
349    
350    def get_data_buffer(self) -> List[HoState]:
351        """
352        获取缓冲区。
353
354        注意:若需要从数据缓冲区中读取数据,请尽快取出,否则缓冲区溢出后最开始的数据会丢失
355
356            :return: 缓冲区。
357        """
358        return copy.deepcopy(self._data_buffer)
359    
360    def length(self) -> int:
361        """
362        获取缓冲区中的数据长度。
363
364            :return: 缓冲区中的数据长度。
365        """
366        return len(self._data_buffer)
367
368    def _init_ui(self) -> None:
369        """
370        初始化 UI。
371        """
372        self.root = tk.Tk()
373        self.root.title("HoServer")
374        self.root.geometry("800x600")
375
376    def run(self) -> None:
377        """
378        启动服务器并运行 UI 更新线程(如果启用 UI)。
379        """
380        self._app_process.start()
381
382        # 数据更新线程
383        self._data_thread = threading.Thread(target=self._update_data, daemon=True)
384        self._data_thread.start()
385
386        if self._ui:
387            self._init_ui()
388            # UI 更新线程
389            self._ui_thread = threading.Thread(target=self._update_ui, daemon=True)
390            self._ui_thread.start()
391
392            self.root.mainloop()
393
394    def _run_app(self, path: str, host: str, port: int) -> None:
395        """
396        启动 FastAPI 服务器并监听请求。
397
398            :param path: API 路径。
399            :param host: 服务器主机。
400            :param port: 服务器端口。
401        """
402        app = FastAPI()
403        app.add_api_route(path, self._handle_data, methods=["POST"])
404
405        uvicorn.run(app, host=host, port=port)
406
407    async def _handle_data(self, request: Request) -> dict:
408        """
409        处理接收到的 POST 请求数据。
410
411            :param request: FastAPI 请求对象。
412            :return: 处理结果。
413        """
414        json_data = await request.json()
415        states_data = json_data.get("states", [])
416
417        states = []
418        for state_data in states_data:
419            state = AlignState(
420                id=state_data["id"],
421                i=state_data["i"],
422                v=state_data["v"],
423                p=state_data["p"],
424                frame=state_data["frame"],
425                timestamp=state_data["timestamp"]
426            )
427            states.append(state)
428        
429        ho_state = HoState(states=states)
430        self._data_queue.put(ho_state)
431        return {"message": "Data received"}
432
433    def _init_ui(self) -> None:
434        """
435        初始化 UI 界面。
436        """
437        self.root = ttkb.Window(themename="superhero", title="HoServer")
438
439        frame = ttk.Frame(self.root)
440        frame.pack(padx=10, pady=10)
441
442        columns = ['Id', 'Frame', 'Timestamp', 'i', 'v', 'p']
443        self.entries = {}
444
445        # 创建表头
446        for col, column_name in enumerate(columns):
447            label = ttk.Label(frame, text=column_name, width=5)
448            label.grid(row=0, column=col, padx=5, pady=5)
449
450        # 创建数据输入框
451        for row in range(8):
452            id_label = ttk.Label(frame, text=f"{row + 1}", width=5)
453            id_label.grid(row=row + 1, column=0, padx=5, pady=5)
454            for col in range(5):
455                entry = ttk.Entry(frame, width=15, state='normal')
456                entry.grid(row=row + 1, column=col + 1, padx=5, pady=10)
457                self.entries[(row, col)] = entry
458
459    def _update_ui(self) -> None:
460        """
461        根据数据缓冲区更新 UI 界面。
462        """
463        def update() -> None:
464            if len(self._data_buffer) == 0:
465                return
466            ho_state = self._data_buffer[-1]
467            
468            # 清空当前数据
469            for row in range(8):
470                for col in range(5):
471                    self.entries[(row, col)].delete(0, tk.END)
472
473            # 更新数据
474            for row in range(8):
475                align_state = ho_state.get_state(row + 1)
476                self.entries[(row, 0)].insert(0, str(align_state.frame))
477                self.entries[(row, 1)].insert(0, str(align_state.timestamp))
478                self.entries[(row, 2)].insert(0, str(round(align_state.i, 2)))
479                self.entries[(row, 3)].insert(0, str(round(align_state.v, 2)))
480                self.entries[(row, 4)].insert(0, str(round(align_state.p, 2)))
481
482        time_interval = 1.0 / self._ui_frequency
483        while not self._shutdown.is_set():
484            time.sleep(time_interval)
485
486            self.root.after(0, update)
487
488
489    def __del__(self) -> None:
490        """
491        清理资源,停止线程和进程。
492        """
493        self._shutdown.set()
494        self._app_process.join()
495        self._data_thread.join()
496        if self._ui:
497            self._ui_thread.join()
498
499
500
501class ManualState(Enum):
502    """ 手动状态枚举 """
503    IDLE = 0
504    """ 空闲 """
505    ALIGN = 1
506    """ 对齐 """
507    SHOOT = 2
508    """ 射击 """
509
510class HoManual(Node):
511    def __init__(self, link: HoLink, name="Manual") -> None:
512        from robotengine import StateMachine
513        super().__init__(name)
514        self._link = link
515        self.state_machine = StateMachine(ManualState.IDLE, name="StateMachine")
516
517
518# if __name__ == "__main__":
519#     ho_server = HoServer("http://127.0.0.1:7777/data", ui=False)
520#     ho_server.run()
class HoMode(enum.Enum):
38class HoMode(Enum):
39    """ Ho 电机模态 """
40    S = 0
41    """ 停止 """
42    I = 1
43    """ 电流控制 """
44    V = 2
45    """ 速度控制 """
46    P = 3
47    """ 位置控制 """

Ho 电机模态

S = <HoMode.S: 0>

停止

I = <HoMode.I: 1>

电流控制

V = <HoMode.V: 2>

速度控制

P = <HoMode.P: 3>

位置控制

Inherited Members
enum.Enum
name
value
class AlignState:
49class AlignState:
50    """ 帧和时间戳对齐的状态数据 """
51    def __init__(self, id: int, i: float, v: float, p: float, frame: int, timestamp: float) -> None:
52        """ 
53        初始化对齐状态数据
54
55            :param id: 电机 id
56            :param i: 电流
57            :param v: 速度
58            :param p: 位置
59            :param frame: 当前帧
60            :param timestamp: 当前时间戳 
61        """
62        self.id = id
63        """ 电机 id """
64        self.i: float = i
65        """ 电流 """
66        self.v: float = v
67        """ 速度 """
68        self.p: float = p
69        """ 位置 """
70        self.frame = frame
71        """ 此状态数据对应的帧 """
72        self.timestamp = timestamp
73        """ 此状态数据对应的时间戳 """
74
75    def to_dict(self):
76        """ 转换为字典 """
77        return {
78            "id": self.id,
79            "i": self.i,
80            "v": self.v,
81            "p": self.p,
82            "frame": self.frame,
83            "timestamp": self.timestamp
84        }
85
86    def __repr__(self):
87        return f"AlignState(id={self.id}, i={round(self.i, 2)}, v={round(self.v, 2)}, p={round(self.p, 2)}, frame={self.frame}, timestamp={round(self.timestamp, 2)})"

帧和时间戳对齐的状态数据

AlignState(id: int, i: float, v: float, p: float, frame: int, timestamp: float)
51    def __init__(self, id: int, i: float, v: float, p: float, frame: int, timestamp: float) -> None:
52        """ 
53        初始化对齐状态数据
54
55            :param id: 电机 id
56            :param i: 电流
57            :param v: 速度
58            :param p: 位置
59            :param frame: 当前帧
60            :param timestamp: 当前时间戳 
61        """
62        self.id = id
63        """ 电机 id """
64        self.i: float = i
65        """ 电流 """
66        self.v: float = v
67        """ 速度 """
68        self.p: float = p
69        """ 位置 """
70        self.frame = frame
71        """ 此状态数据对应的帧 """
72        self.timestamp = timestamp
73        """ 此状态数据对应的时间戳 """

初始化对齐状态数据

:param id: 电机 id
:param i: 电流
:param v: 速度
:param p: 位置
:param frame: 当前帧
:param timestamp: 当前时间戳
id

电机 id

i: float

电流

v: float

速度

p: float

位置

frame

此状态数据对应的帧

timestamp

此状态数据对应的时间戳

def to_dict(self):
75    def to_dict(self):
76        """ 转换为字典 """
77        return {
78            "id": self.id,
79            "i": self.i,
80            "v": self.v,
81            "p": self.p,
82            "frame": self.frame,
83            "timestamp": self.timestamp
84        }

转换为字典

class HoState:
 89class HoState:
 90    """ Ho 机器人状态 """
 91    def __init__(self, states: List[AlignState], random_state=False) -> None:
 92        """ 
 93        初始化 Ho 机器人状态
 94
 95            :param states: 帧和时间戳对齐的状态数据列表
 96            :param random_state: 是否随机生成状态数据
 97        """
 98        if not random_state:
 99            self._states = states
100        else:
101            self._states = []
102            for i in range(1, 9):
103                self._states.append(AlignState(i, random.uniform(-1.0, 1.0), random.uniform(-360.0, 360.0), random.uniform(-1000.0, 1000.0), 0, 0.0))
104
105    def get_state(self, id: int) -> AlignState:
106        """ 
107        获取指定 id 的状态 
108        """
109        for state in self._states:
110            if state.id == id:
111                return state
112        return None
113    
114    def get_states(self) -> List[AlignState]:
115        """ 
116        获取所有状态 
117        """
118        return self._states
119    
120    def to_dict(self):
121        """ 
122        转换为字典 
123        """
124        return {
125            "states": [state.to_dict() for state in self._states]
126        }
127    
128    def __repr__(self):
129        state_str = ""
130        for state in self._states:
131            state_str += str(state)
132            if state != self._states[-1]:
133                state_str += "\n"
134        return f"HoState(\n{state_str})"

Ho 机器人状态

HoState(states: List[AlignState], random_state=False)
 91    def __init__(self, states: List[AlignState], random_state=False) -> None:
 92        """ 
 93        初始化 Ho 机器人状态
 94
 95            :param states: 帧和时间戳对齐的状态数据列表
 96            :param random_state: 是否随机生成状态数据
 97        """
 98        if not random_state:
 99            self._states = states
100        else:
101            self._states = []
102            for i in range(1, 9):
103                self._states.append(AlignState(i, random.uniform(-1.0, 1.0), random.uniform(-360.0, 360.0), random.uniform(-1000.0, 1000.0), 0, 0.0))

初始化 Ho 机器人状态

:param states: 帧和时间戳对齐的状态数据列表
:param random_state: 是否随机生成状态数据
def get_state(self, id: int) -> AlignState:
105    def get_state(self, id: int) -> AlignState:
106        """ 
107        获取指定 id 的状态 
108        """
109        for state in self._states:
110            if state.id == id:
111                return state
112        return None

获取指定 id 的状态

def get_states(self) -> List[AlignState]:
114    def get_states(self) -> List[AlignState]:
115        """ 
116        获取所有状态 
117        """
118        return self._states

获取所有状态

def to_dict(self):
120    def to_dict(self):
121        """ 
122        转换为字典 
123        """
124        return {
125            "states": [state.to_dict() for state in self._states]
126        }

转换为字典

class HoServer:
292class HoServer:
293    def __init__(self, url: str, capacity=1024, ui: bool=True, ui_frequency: float=30.0) -> None:
294        """
295        初始化 HoServer 实例。
296
297            :param url: 服务器的 URL。
298            :param capacity: 数据缓冲区的最大容量。
299            :param ui: 是否启用 UI 界面。
300            :param ui_frequency: UI 更新频率(Hz)。
301        """
302        self._url = url
303        parsed_url = urlparse(url)
304        self._host = parsed_url.hostname
305        self._port = parsed_url.port
306        self._path = parsed_url.path
307
308        self._ui = ui
309        self._ui_frequency = ui_frequency
310        self._capacity = capacity
311        self._data_buffer = []
312        """ 
313        数据缓冲区 
314        """
315
316        self._data_queue = multiprocessing.Queue()
317        self._shutdown = multiprocessing.Event()
318
319        # 启动 FastAPI 应用进程
320        self._app_process = multiprocessing.Process(target=self._run_app, args=(self._path, self._host, self._port), daemon=True)
321
322    def _update_data(self):
323        """
324        从数据队列中读取数据并更新缓冲区。
325        """
326        while not self._shutdown.is_set():
327            if not self._data_queue.empty():
328                ho_state = self._data_queue.get()
329                self._data_buffer.append(ho_state)
330                if len(self._data_buffer) > self._capacity:
331                    self._data_buffer.pop(0)
332
333    def has_data(self):
334        """
335        检查缓冲区中是否有数据。
336
337            :return: 如果缓冲区中有数据,则返回 True,否则返回 False。
338        """
339        return len(self._data_buffer) > 0
340
341    def get_data(self) -> HoState:
342        """
343        获取缓冲区中最新的数据。
344
345            :return: 缓冲区中最新的数据,如果缓冲区为空,则返回 None。
346        """
347        if not self.has_data():
348            return None
349        return self._data_buffer.pop(-1)
350    
351    def get_data_buffer(self) -> List[HoState]:
352        """
353        获取缓冲区。
354
355        注意:若需要从数据缓冲区中读取数据,请尽快取出,否则缓冲区溢出后最开始的数据会丢失
356
357            :return: 缓冲区。
358        """
359        return copy.deepcopy(self._data_buffer)
360    
361    def length(self) -> int:
362        """
363        获取缓冲区中的数据长度。
364
365            :return: 缓冲区中的数据长度。
366        """
367        return len(self._data_buffer)
368
369    def _init_ui(self) -> None:
370        """
371        初始化 UI。
372        """
373        self.root = tk.Tk()
374        self.root.title("HoServer")
375        self.root.geometry("800x600")
376
377    def run(self) -> None:
378        """
379        启动服务器并运行 UI 更新线程(如果启用 UI)。
380        """
381        self._app_process.start()
382
383        # 数据更新线程
384        self._data_thread = threading.Thread(target=self._update_data, daemon=True)
385        self._data_thread.start()
386
387        if self._ui:
388            self._init_ui()
389            # UI 更新线程
390            self._ui_thread = threading.Thread(target=self._update_ui, daemon=True)
391            self._ui_thread.start()
392
393            self.root.mainloop()
394
395    def _run_app(self, path: str, host: str, port: int) -> None:
396        """
397        启动 FastAPI 服务器并监听请求。
398
399            :param path: API 路径。
400            :param host: 服务器主机。
401            :param port: 服务器端口。
402        """
403        app = FastAPI()
404        app.add_api_route(path, self._handle_data, methods=["POST"])
405
406        uvicorn.run(app, host=host, port=port)
407
408    async def _handle_data(self, request: Request) -> dict:
409        """
410        处理接收到的 POST 请求数据。
411
412            :param request: FastAPI 请求对象。
413            :return: 处理结果。
414        """
415        json_data = await request.json()
416        states_data = json_data.get("states", [])
417
418        states = []
419        for state_data in states_data:
420            state = AlignState(
421                id=state_data["id"],
422                i=state_data["i"],
423                v=state_data["v"],
424                p=state_data["p"],
425                frame=state_data["frame"],
426                timestamp=state_data["timestamp"]
427            )
428            states.append(state)
429        
430        ho_state = HoState(states=states)
431        self._data_queue.put(ho_state)
432        return {"message": "Data received"}
433
434    def _init_ui(self) -> None:
435        """
436        初始化 UI 界面。
437        """
438        self.root = ttkb.Window(themename="superhero", title="HoServer")
439
440        frame = ttk.Frame(self.root)
441        frame.pack(padx=10, pady=10)
442
443        columns = ['Id', 'Frame', 'Timestamp', 'i', 'v', 'p']
444        self.entries = {}
445
446        # 创建表头
447        for col, column_name in enumerate(columns):
448            label = ttk.Label(frame, text=column_name, width=5)
449            label.grid(row=0, column=col, padx=5, pady=5)
450
451        # 创建数据输入框
452        for row in range(8):
453            id_label = ttk.Label(frame, text=f"{row + 1}", width=5)
454            id_label.grid(row=row + 1, column=0, padx=5, pady=5)
455            for col in range(5):
456                entry = ttk.Entry(frame, width=15, state='normal')
457                entry.grid(row=row + 1, column=col + 1, padx=5, pady=10)
458                self.entries[(row, col)] = entry
459
460    def _update_ui(self) -> None:
461        """
462        根据数据缓冲区更新 UI 界面。
463        """
464        def update() -> None:
465            if len(self._data_buffer) == 0:
466                return
467            ho_state = self._data_buffer[-1]
468            
469            # 清空当前数据
470            for row in range(8):
471                for col in range(5):
472                    self.entries[(row, col)].delete(0, tk.END)
473
474            # 更新数据
475            for row in range(8):
476                align_state = ho_state.get_state(row + 1)
477                self.entries[(row, 0)].insert(0, str(align_state.frame))
478                self.entries[(row, 1)].insert(0, str(align_state.timestamp))
479                self.entries[(row, 2)].insert(0, str(round(align_state.i, 2)))
480                self.entries[(row, 3)].insert(0, str(round(align_state.v, 2)))
481                self.entries[(row, 4)].insert(0, str(round(align_state.p, 2)))
482
483        time_interval = 1.0 / self._ui_frequency
484        while not self._shutdown.is_set():
485            time.sleep(time_interval)
486
487            self.root.after(0, update)
488
489
490    def __del__(self) -> None:
491        """
492        清理资源,停止线程和进程。
493        """
494        self._shutdown.set()
495        self._app_process.join()
496        self._data_thread.join()
497        if self._ui:
498            self._ui_thread.join()
HoServer(url: str, capacity=1024, ui: bool = True, ui_frequency: float = 30.0)
293    def __init__(self, url: str, capacity=1024, ui: bool=True, ui_frequency: float=30.0) -> None:
294        """
295        初始化 HoServer 实例。
296
297            :param url: 服务器的 URL。
298            :param capacity: 数据缓冲区的最大容量。
299            :param ui: 是否启用 UI 界面。
300            :param ui_frequency: UI 更新频率(Hz)。
301        """
302        self._url = url
303        parsed_url = urlparse(url)
304        self._host = parsed_url.hostname
305        self._port = parsed_url.port
306        self._path = parsed_url.path
307
308        self._ui = ui
309        self._ui_frequency = ui_frequency
310        self._capacity = capacity
311        self._data_buffer = []
312        """ 
313        数据缓冲区 
314        """
315
316        self._data_queue = multiprocessing.Queue()
317        self._shutdown = multiprocessing.Event()
318
319        # 启动 FastAPI 应用进程
320        self._app_process = multiprocessing.Process(target=self._run_app, args=(self._path, self._host, self._port), daemon=True)

初始化 HoServer 实例。

:param url: 服务器的 URL。
:param capacity: 数据缓冲区的最大容量。
:param ui: 是否启用 UI 界面。
:param ui_frequency: UI 更新频率(Hz)。
def has_data(self):
333    def has_data(self):
334        """
335        检查缓冲区中是否有数据。
336
337            :return: 如果缓冲区中有数据,则返回 True,否则返回 False。
338        """
339        return len(self._data_buffer) > 0

检查缓冲区中是否有数据。

:return: 如果缓冲区中有数据,则返回 True,否则返回 False。
def get_data(self) -> HoState:
341    def get_data(self) -> HoState:
342        """
343        获取缓冲区中最新的数据。
344
345            :return: 缓冲区中最新的数据,如果缓冲区为空,则返回 None。
346        """
347        if not self.has_data():
348            return None
349        return self._data_buffer.pop(-1)

获取缓冲区中最新的数据。

:return: 缓冲区中最新的数据,如果缓冲区为空,则返回 None。
def get_data_buffer(self) -> List[HoState]:
351    def get_data_buffer(self) -> List[HoState]:
352        """
353        获取缓冲区。
354
355        注意:若需要从数据缓冲区中读取数据,请尽快取出,否则缓冲区溢出后最开始的数据会丢失
356
357            :return: 缓冲区。
358        """
359        return copy.deepcopy(self._data_buffer)

获取缓冲区。

注意:若需要从数据缓冲区中读取数据,请尽快取出,否则缓冲区溢出后最开始的数据会丢失

:return: 缓冲区。
def length(self) -> int:
361    def length(self) -> int:
362        """
363        获取缓冲区中的数据长度。
364
365            :return: 缓冲区中的数据长度。
366        """
367        return len(self._data_buffer)

获取缓冲区中的数据长度。

:return: 缓冲区中的数据长度。
def run(self) -> None:
377    def run(self) -> None:
378        """
379        启动服务器并运行 UI 更新线程(如果启用 UI)。
380        """
381        self._app_process.start()
382
383        # 数据更新线程
384        self._data_thread = threading.Thread(target=self._update_data, daemon=True)
385        self._data_thread.start()
386
387        if self._ui:
388            self._init_ui()
389            # UI 更新线程
390            self._ui_thread = threading.Thread(target=self._update_ui, daemon=True)
391            self._ui_thread.start()
392
393            self.root.mainloop()

启动服务器并运行 UI 更新线程(如果启用 UI)。

class ManualState(enum.Enum):
502class ManualState(Enum):
503    """ 手动状态枚举 """
504    IDLE = 0
505    """ 空闲 """
506    ALIGN = 1
507    """ 对齐 """
508    SHOOT = 2
509    """ 射击 """

手动状态枚举

IDLE = <ManualState.IDLE: 0>

空闲

ALIGN = <ManualState.ALIGN: 1>

对齐

SHOOT = <ManualState.SHOOT: 2>

射击

Inherited Members
enum.Enum
name
value
class HoManual(robotengine.node.Node):
511class HoManual(Node):
512    def __init__(self, link: HoLink, name="Manual") -> None:
513        from robotengine import StateMachine
514        super().__init__(name)
515        self._link = link
516        self.state_machine = StateMachine(ManualState.IDLE, name="StateMachine")

Node 基类

HoManual(link: HoLink, name='Manual')
512    def __init__(self, link: HoLink, name="Manual") -> None:
513        from robotengine import StateMachine
514        super().__init__(name)
515        self._link = link
516        self.state_machine = StateMachine(ManualState.IDLE, name="StateMachine")

初始化节点

:param name: 节点名称
state_machine