robotengine.ho_robot

ho_robot 是 robotengine 控制 ho 机器人的节点。

ho_link 与 机器人之间的通讯是自动的,在连接好设备并确定串口是正常开启后,会自动与机器人进行通讯并更新。

如果配置了 url ho_link 节点会自动发送机器人的状态 HoState 到 url 指定的地址。

ho_link 会不断被动地接收机器人的状态并更新,但是不会主动向机器人发送数据。

使用 ho_link.update() 函数可以向机器人发送数据。

挂载 ho_link 节点后,_process()的处理速度会显著受到影响,请酌情调整 engine 的运行频率。

  1"""
  2
  3ho_robot 是 robotengine 控制 ho 机器人的节点。
  4
  5ho_link 与 机器人之间的通讯是自动的,在连接好设备并确定串口是正常开启后,会自动与机器人进行通讯并更新。
  6
  7如果配置了 url ho_link 节点会自动发送机器人的状态 HoState 到 url 指定的地址。
  8
  9ho_link 会不断被动地接收机器人的状态并更新,但是不会主动向机器人发送数据。
 10
 11使用 ho_link.update() 函数可以向机器人发送数据。
 12
 13挂载 ho_link 节点后,_process()的处理速度会显著受到影响,请酌情调整 engine 的运行频率。
 14"""
 15
 16from robotengine.node import Node
 17from robotengine.serial_io import SerialIO, DeviceType, CheckSumType
 18from robotengine.tools import hex2str, warning, error, info, near, vector_angle, vector_length, compute_vector_projection, find_closest_vectors
 19from robotengine.signal import Signal
 20from robotengine.timer import Timer
 21from typing import List, Tuple
 22from enum import Enum
 23import requests
 24import threading
 25import time
 26import random
 27import multiprocessing
 28import tkinter as tk
 29from ttkbootstrap import ttk
 30import ttkbootstrap as ttkb
 31from fastapi import FastAPI, Request
 32import uvicorn
 33from urllib.parse import urlparse
 34import copy
 35import math
 36import json
 37import os
 38
 39class HoMode(Enum):
 40    """ Ho 电机模态 """
 41    S = 0
 42    """ 停止 """
 43    I = 1
 44    """ 电流控制 """
 45    V = 2
 46    """ 速度控制 """
 47    P = 3
 48    """ 位置控制 """
 49
 50class AlignState:
 51    """ 帧和时间戳对齐的状态数据 """
 52    def __init__(self, id: int, i: float, v: float, p: float, frame: int, timestamp: float) -> None:
 53        """ 
 54        初始化对齐状态数据
 55
 56            :param id: 电机 id
 57            :param i: 电流
 58            :param v: 速度
 59            :param p: 位置
 60            :param frame: 当前帧
 61            :param timestamp: 当前时间戳 
 62        """
 63        self.id = id
 64        """ 电机 id """
 65        self.i: float = i
 66        """ 电流 """
 67        self.v: float = v
 68        """ 速度 """
 69        self.p: float = p
 70        """ 位置 """
 71        self.frame = frame
 72        """ 此状态数据对应的帧 """
 73        self.timestamp = timestamp
 74        """ 此状态数据对应的时间戳 """
 75
 76    def to_dict(self):
 77        """ 转换为字典 """
 78        return {
 79            "id": self.id,
 80            "i": self.i,
 81            "v": self.v,
 82            "p": self.p,
 83            "frame": self.frame,
 84            "timestamp": self.timestamp
 85        }
 86
 87    def __repr__(self):
 88        return f"AlignState(id={self.id}, i={round(self.i, 2)}, v={round(self.v, 2)}, p={round(self.p, 2)}, frame={self.frame}, timestamp={round(self.timestamp, 2)})"
 89    
 90class HoState:
 91    """ Ho 机器人状态 """
 92    def __init__(self, states: List[AlignState], random_state=False) -> None:
 93        """ 
 94        初始化 Ho 机器人状态
 95
 96            :param states: 帧和时间戳对齐的状态数据列表
 97            :param random_state: 是否随机生成状态数据
 98        """
 99        if not random_state:
100            self._states = states
101        else:
102            self._states = []
103            for i in range(1, 9):
104                self._states.append(AlignState(i, random.uniform(-1.0, 1.0), random.uniform(-360.0, 360.0), random.uniform(-1000.0, 1000.0), 0, 0.0))
105
106    def get_state(self, id: int) -> AlignState:
107        """ 
108        获取指定 id 的状态 
109        """
110        for state in self._states:
111            if state.id == id:
112                return state
113        return None
114    
115    def get_states(self) -> List[AlignState]:
116        """ 
117        获取所有状态 
118        """
119        return copy.deepcopy(self._states)
120    
121    def to_dict(self):
122        """ 
123        转换为字典 
124        """
125        return {
126            "states": [state.to_dict() for state in self._states]
127        }
128    
129    def __repr__(self):
130        state_str = ""
131        for state in self._states:
132            state_str += str(state)
133            if state != self._states[-1]:
134                state_str += "\n"
135        return f"HoState(\n{state_str})"
136
137class HoLink(Node):
138    """ Ho 机器人链接节点 """
139    def __init__(self, name="HoLink", buffer_capacity: int=1024, url=None, warn=True) -> None:
140        """ 
141        初始化 Ho 机器人链接节点 
142
143            :param name: 节点名称
144            :param buffer_capacity: 存储状态数据的缓冲区的容量
145            :param url: 数据发送的 url
146            :param read_mode: 串口读取模式
147            :param warn: 是否显示警告
148        """
149        super().__init__(name)
150        self._data_length = 84
151        self._receive_data = None
152        self._url = url
153        self._warn = warn
154        
155        if self._url:
156            self._shutdown = multiprocessing.Event()
157            self._pending_capacity = 256
158            self._pending_requests = multiprocessing.Queue()
159            self._http_process = multiprocessing.Process(target=self._http_request, daemon=True, name=self.name+"HttpProcess")
160            self._http_process.start()
161
162        self._buffer_capacity: int = buffer_capacity
163        """ 存储状态数据的缓冲区的容量 """
164        self._state_buffer: List[HoState] = []
165        """ 存储状态数据的缓冲区 """
166
167        self.sio: SerialIO = SerialIO(name="HoSerialIO", device_type=DeviceType.STM32F407, checksum_type=CheckSumType.SUM16, header=[0x0D, 0x0A], warn=warn, baudrate=1000000, timeout=1.0)
168        """ 串口节点 HoLink 会主动挂载一个已经配置好的串口节点 """
169        self.add_child(self.sio)
170
171        self.receive: Signal = Signal(bytes)
172        """ 信号,当接收到数据时触发(无论是否通过校验和) """
173        self.ho_state_update: Signal = Signal(HoState)
174        """ 信号,当接收到数据并成功通过校验和,将状态数据更新到信号参数中时触发 """
175
176    def _ready(self) -> None:
177        pass
178
179    def get_ho_state(self) -> HoState:
180        """
181        获取机器人当前最新的状态数据
182        """
183        if len(self._state_buffer) == 0:
184            return None
185        return self._state_buffer[-1]
186    
187    def get_ho_state_buffer(self) -> List[HoState]:
188        """
189        获取机器人当前的状态数据缓冲区
190        """
191        return copy.deepcopy(self._state_buffer)
192
193    def _add_pending_request(self, ho_state: HoState):
194        """ 
195        向请求队列中添加请求 
196        """
197        self._pending_requests.put(ho_state)
198        if self._pending_requests.qsize() > self._pending_capacity:
199            if self._warn:
200                warning(f"{self.name}{self._url} 发送请求时,请求队列已满,将丢弃最早的请求,可能会导致数据丢失")
201            self._pending_requests.get()
202
203    def _send_request(self, ho_state_dict: dict) -> None:
204        start_time = time.perf_counter()
205        try:
206            response = requests.post(self._url, json=ho_state_dict, timeout=0.1)
207
208            end_time = time.perf_counter()
209            latency = end_time - start_time
210            # print(f"Request latency: {round(latency * 1000, 2)} ms")
211
212        except requests.RequestException as e:
213            if self._warn:
214                warning(f"请求失败: {e}")
215        except Exception as e:
216            if self._warn:
217                warning(f"发生未知错误: {e}")
218
219    def _http_request(self):
220        info(f"{self.name} 已开启向服务地址 {self._url} 发送数据的功能")
221        while not self._shutdown.is_set():
222            if not self._pending_requests.empty():
223                ho_state = self._pending_requests.get()
224                self._send_request(ho_state.to_dict())
225
226    def update(self, id: int, mode: HoMode, i: float, v: float, p: float) -> None:
227        """ 
228        向机器人发送数据 
229        """
230        data = bytes([id]) + bytes([mode.value]) + self._encode(p, 100.0, 4) + \
231            self._encode(v, 100.0, 4) + self._encode(i, 100.0, 2)
232        # print(f"发送数据: {hex2str(data)}")
233        self.sio.transmit(data)
234
235    def _process(self, delta) -> None:
236        self._receive_data = self.sio.receive(self._data_length)
237        if self._receive_data:
238            if self.sio.check_sum(self._receive_data):
239                states = []
240                receive_data = self._receive_data[2:-2]
241
242                id = 1
243                for i in range(0, 80, 10):
244                    _data = receive_data[i:i+10]
245                    _p = self._decode(_data[0:4], 100.0, 4)
246                    _v = self._decode(_data[4:8], 100.0, 4)
247                    _i = self._decode(_data[8:10], 100.0, 2)
248
249                    align_state = AlignState(id=id, i=_i, v=_v, p=_p, frame=self.engine.get_frame(), timestamp=self.engine.get_timestamp())
250                    states.append(align_state)
251                    id += 1
252
253                ho_state = HoState(states)
254                self._state_buffer.append(ho_state)
255
256                if len(self._state_buffer) > self._buffer_capacity:
257                    self._state_buffer.pop(0)
258
259                self.ho_state_update.emit(ho_state)
260                if self._url:
261                    self._add_pending_request(ho_state)
262            else:
263                if self._warn:
264                    warning(f"{self.name} 长度为 {len(self._receive_data)} 的数据 {hex2str(self._receive_data)} 校验和错误")
265            self.receive.emit(self._receive_data)
266
267    def _encode(self, value: float, scale_factor: float, byte_length: int) -> bytes:
268        max_value = (1 << (8 * byte_length - 1))
269        max_scaled_value = max_value / scale_factor
270
271        if abs(value) >= max_scaled_value:
272            raise ValueError(f"要编码的值 {round(value, 2)} 超出范围 [-{max_scaled_value}, {max_scaled_value}]")
273
274        encoded_value = int(value * scale_factor) + max_value
275        
276        max_value_for_length = (1 << (8 * byte_length)) - 1
277        if encoded_value > max_value_for_length:
278            raise ValueError(f"编码值 {encoded_value} 超出了 {byte_length} 字节的最大值 {max_value_for_length}")
279
280        byte_data = []
281        for i in range(byte_length):
282            byte_data.insert(0, encoded_value & 0xFF)
283            encoded_value >>= 8
284
285        return bytes(byte_data)
286
287    def _decode(self, data: bytes, scale_factor: float, byte_length: int) -> float:
288        if len(data) != byte_length:
289            raise ValueError(f"数据长度 {len(data)} 与指定的字节长度 {byte_length} 不匹配")
290        max_value = (1 << (8 * byte_length - 1))
291
292        decoded_value = 0
293        for i in range(byte_length):
294            decoded_value <<= 8
295            decoded_value |= data[i]
296        
297        decoded_value -= max_value
298
299        return decoded_value / scale_factor
300    
301    # def _on_engine_exit(self):
302    #     if self._url:
303    #         self._shutdown.set()
304    #         self._http_process.join()
305            
306
307class HoServer:
308    def __init__(self, url: str, capacity=1024, ui: bool=True, ui_frequency: float=30.0) -> None:
309        """
310        初始化 HoServer 实例。
311
312            :param url: 服务器的 URL。
313            :param capacity: 数据缓冲区的最大容量。
314            :param ui: 是否启用 UI 界面。
315            :param ui_frequency: UI 更新频率(Hz)。
316        """
317        self._url = url
318        parsed_url = urlparse(url)
319        self._host = parsed_url.hostname
320        self._port = parsed_url.port
321        self._path = parsed_url.path
322
323        self._ui = ui
324        self._ui_frequency = ui_frequency
325        self._capacity = capacity
326        self._data_buffer = []
327        """ 
328        数据缓冲区 
329        """
330
331        self._data_queue = multiprocessing.Queue()
332        self._shutdown = multiprocessing.Event()
333
334        # 启动 FastAPI 应用进程
335        self._app_process = multiprocessing.Process(target=self._run_app, args=(self._path, self._host, self._port), daemon=True)
336
337    def _update_data(self):
338        """
339        从数据队列中读取数据并更新缓冲区。
340        """
341        while not self._shutdown.is_set():
342            if not self._data_queue.empty():
343                ho_state = self._data_queue.get()
344                self._data_buffer.append(ho_state)
345                if len(self._data_buffer) > self._capacity:
346                    self._data_buffer.pop(0)
347
348    def has_data(self) -> bool:
349        """
350        检查缓冲区中是否有数据。
351
352            :return: 如果缓冲区中有数据,则返回 True,否则返回 False。
353        """
354        return len(self._data_buffer) > 0
355
356    def get_data(self) -> HoState:
357        """
358        获取缓冲区中最新的数据。
359
360            :return: 缓冲区中最新的数据,如果缓冲区为空,则返回 None。
361        """
362        if not self.has_data():
363            return None
364        return self._data_buffer.pop(-1)
365    
366    def get_data_buffer(self) -> List[HoState]:
367        """
368        获取缓冲区。
369
370        注意:若需要从数据缓冲区中读取数据,请尽快取出,否则缓冲区溢出后最开始的数据会丢失
371
372            :return: 缓冲区。
373        """
374        return copy.deepcopy(self._data_buffer)
375    
376    def length(self) -> int:
377        """
378        获取缓冲区中的数据长度。
379
380            :return: 缓冲区中的数据长度。
381        """
382        return len(self._data_buffer)
383
384    def _init_ui(self) -> None:
385        """
386        初始化 UI。
387        """
388        self.root = tk.Tk()
389        self.root.title("HoServer")
390        self.root.geometry("800x600")
391
392    def run(self) -> None:
393        """
394        启动服务器并运行 UI 更新线程(如果启用 UI)。
395        """
396        self._app_process.start()
397
398        # 数据更新线程
399        self._data_thread = threading.Thread(target=self._update_data, daemon=True)
400        self._data_thread.start()
401
402        if self._ui:
403            self._init_ui()
404            # UI 更新线程
405            self._ui_thread = threading.Thread(target=self._update_ui, daemon=True)
406            self._ui_thread.start()
407
408            self.root.mainloop()
409
410    def _run_app(self, path: str, host: str, port: int) -> None:
411        """
412        启动 FastAPI 服务器并监听请求。
413
414            :param path: API 路径。
415            :param host: 服务器主机。
416            :param port: 服务器端口。
417        """
418        app = FastAPI()
419        app.add_api_route(path, self._handle_data, methods=["POST"])
420
421        uvicorn.run(app, host=host, port=port)
422
423    async def _handle_data(self, request: Request) -> dict:
424        """
425        处理接收到的 POST 请求数据。
426
427            :param request: FastAPI 请求对象。
428            :return: 处理结果。
429        """
430        json_data = await request.json()
431        states_data = json_data.get("states", [])
432
433        states = []
434        for state_data in states_data:
435            state = AlignState(
436                id=state_data["id"],
437                i=state_data["i"],
438                v=state_data["v"],
439                p=state_data["p"],
440                frame=state_data["frame"],
441                timestamp=state_data["timestamp"]
442            )
443            states.append(state)
444        
445        ho_state = HoState(states=states)
446        self._data_queue.put(ho_state)
447        return {"message": "Data received"}
448
449    def _init_ui(self) -> None:
450        """
451        初始化 UI 界面。
452        """
453        self.root = ttkb.Window(themename="superhero", title="HoServer")
454
455        frame = ttk.Frame(self.root)
456        frame.pack(padx=10, pady=10)
457
458        columns = ['Id', 'Frame', 'Timestamp', 'i', 'v', 'p']
459        self.entries = {}
460
461        # 创建表头
462        for col, column_name in enumerate(columns):
463            label = ttk.Label(frame, text=column_name, width=5)
464            label.grid(row=0, column=col, padx=5, pady=5)
465
466        # 创建数据输入框
467        for row in range(8):
468            id_label = ttk.Label(frame, text=f"{row + 1}", width=5)
469            id_label.grid(row=row + 1, column=0, padx=5, pady=5)
470            for col in range(5):
471                entry = ttk.Entry(frame, width=15, state='normal')
472                entry.grid(row=row + 1, column=col + 1, padx=5, pady=10)
473                self.entries[(row, col)] = entry
474
475    def _update_ui(self) -> None:
476        """
477        根据数据缓冲区更新 UI 界面。
478        """
479        def update() -> None:
480            if len(self._data_buffer) == 0:
481                return
482            ho_state = self._data_buffer[-1]
483            
484            # 清空当前数据
485            for row in range(8):
486                for col in range(5):
487                    self.entries[(row, col)].delete(0, tk.END)
488
489            # 更新数据
490            for row in range(8):
491                align_state = ho_state.get_state(row + 1)
492                self.entries[(row, 0)].insert(0, str(align_state.frame))
493                self.entries[(row, 1)].insert(0, str(align_state.timestamp))
494                self.entries[(row, 2)].insert(0, str(round(align_state.i, 2)))
495                self.entries[(row, 3)].insert(0, str(round(align_state.v, 2)))
496                self.entries[(row, 4)].insert(0, str(round(align_state.p, 2)))
497
498        time_interval = 1.0 / self._ui_frequency
499        while not self._shutdown.is_set():
500            time.sleep(time_interval)
501
502            self.root.after(0, update)
503
504
505    def __del__(self) -> None:
506        """
507        清理资源,停止线程和进程。
508        """
509        self._shutdown.set()
510        self._app_process.join()
511        self._data_thread.join()
512        if self._ui:
513            self._ui_thread.join()
514
515
516class ManualState(Enum):
517    """ 手动状态枚举 """
518    IDLE = 0
519    """ 空闲状态,所有电机停止运动 """
520    RUNNING = 1
521    """ 运行状态 """
522    RETURNING = 2
523    """ 返回状态 """
524    ZEROING = 3
525    """ 设置零点状态 """
526
527class HoManual(Node):
528    """ Ho 机器人的手动控制节点 """
529    from robotengine import InputEvent
530    def __init__(self, link: HoLink, name="Manual", rotation_velocity: float = 360.0, running_scale: float=100.0, zeroing_scale: float=100.0, axis_threshold: float=0.1) -> None:
531        """
532        初始化 HoManual 实例。
533
534            :param link: HoLink 实例。
535            :param name: 节点名称。
536            :param rotation_velocity: 旋转速度(度/秒)。
537            :param running_scale: 运行状态的缩放因子。
538            :param zeroing_scale: 设置零点状态的缩放因子。
539            :param axis_threshold: 轴的阈值。
540        """
541        from robotengine import StateMachine
542        super().__init__(name)
543        self._debug = False
544        self._valid = True
545
546        self._link = link
547        self._link.ho_state_update.connect(self._on_ho_state_update)
548        
549        self.state_machine = StateMachine(ManualState.IDLE, name="HoManualStateMachine")
550        self.add_child(self.state_machine)
551        
552        self._zero_angles = {
553            4: 0.0,
554            5: 0.0,
555            6: 0.0,
556            7: 0.0,
557        }
558        self._zero_index = 4
559
560        self._is_tension = False
561        self._is_rotation = False
562        self._rotation_velocity = rotation_velocity
563        self._base_angles = [math.pi / 4, math.pi / 4 * 3, math.pi / 4 * 5, math.pi / 4 * 7]
564
565        self._running_scale = running_scale
566        self._zeroing_scale = zeroing_scale
567        self._axis_threshold = axis_threshold
568
569        self._before_returning = None
570
571        self.exit()
572
573    def _init(self):
574        _load_zero_angles = self._load_from_json("zero_angles.json")
575        if _load_zero_angles:
576            info("成功加载 zero_angles.json")
577            for i in range(4, 8):
578                self._zero_angles[i] = _load_zero_angles[str(i)]
579                info(f"{i}: {self._zero_angles[i]}")
580
581    def _input(self, event: InputEvent) -> None:
582        if not self._valid:
583            return
584        
585        state = self.state_machine.current_state
586
587        if event.is_action_pressed("X"):
588            self.tension(not self._is_tension)
589
590        elif event.is_action_pressed("Y"):
591            self.rotation(not self._is_rotation, self._rotation_velocity)
592
593        if state == ManualState.ZEROING:
594            if event.is_action_pressed("LEFT_SHOULDER"):
595                self._change_index(-1)
596
597            elif event.is_action_pressed("RIGHT_SHOULDER"):
598                self._change_index(1)
599
600            elif event.is_action_pressed("A"):
601                if self._debug:
602                    return
603                ho_state = self._link.get_ho_state()
604                if not ho_state:
605                    return
606                for i in range(4, 8):
607                    state = ho_state.get_state(i)
608                    self._zero_angles[i] = state.p
609                self._save_to_json("zero_angles.json", self._zero_angles)
610
611    def _change_index(self, dir: int) -> None:
612        self.lock(self._zero_index)
613        self._zero_index += dir
614        if self._zero_index > 7:
615            self._zero_index = 4
616        elif self._zero_index < 4:
617            self._zero_index = 7
618        info(f"     当前电机: {self._zero_index}")
619
620    def _on_ho_state_update(self, ho_state: HoState):
621        if not self._valid:
622            return
623        
624        state = self.state_machine.current_state
625
626        if state == ManualState.IDLE:
627            pass
628
629        elif state == ManualState.RUNNING:
630            x_value = self._threshold(self.input.get_action_strength("RIGHT_X"))
631            y_value = self._threshold(self.input.get_action_strength("LEFT_Y"))
632            self.turn(x_value, y_value, ho_state)
633
634            l_value = self._threshold(self.input.get_action_strength("TRIGGER_LEFT"))
635            r_value = self._threshold(self.input.get_action_strength("TRIGGER_RIGHT"))
636            self.move(l_value, r_value, ho_state)
637
638        elif state == ManualState.RETURNING:
639            for i in range(4, 8):
640                self._link.update(i, HoMode.P, 2.0, 100.0, self._zero_angles[i])
641
642        elif state == ManualState.ZEROING:
643            direction = self.input.get_action_strength("LEFT_Y")
644            direction = self._threshold(direction)
645            velocity = direction * self._zeroing_scale
646            if not self._debug:
647                self._link.update(self._zero_index, HoMode.V, 2.0, velocity, 0.0)
648
649    def tick(self, state: ManualState, delta: float) -> None:
650        if state == ManualState.IDLE:
651            pass
652
653        elif state == ManualState.RUNNING:
654            pass
655
656        elif state == ManualState.RETURNING:
657            pass
658
659        elif state == ManualState.ZEROING:
660            pass
661
662    def _threshold(self, value: float) -> float:
663        if abs(value) < self._axis_threshold:
664            return 0.0
665        return value
666
667    def get_next_state(self, state: ManualState) -> ManualState:
668        if state == ManualState.IDLE:
669            if self.input.is_action_pressed("START"):
670                self.input.flush_action("START")
671                return ManualState.RUNNING
672            
673            if self.input.is_action_pressed("B"):
674                self.input.flush_action("B")
675                return ManualState.RETURNING
676            
677            elif self.input.is_action_pressed("RIGHT_STICK"):
678                self.input.flush_action("RIGHT_STICK")
679                return ManualState.ZEROING
680
681        elif state == ManualState.RUNNING:
682            if self.input.is_action_pressed("START"):
683                self.input.flush_action("START")
684                return ManualState.IDLE
685            
686            if self.input.is_action_pressed("B"):
687                self.input.flush_action("B")
688                return ManualState.RETURNING
689
690        elif state == ManualState.RETURNING:
691            if self.input.is_action_pressed("B"):
692                self.input.flush_action("B")
693                return self._before_returning
694
695        elif state == ManualState.ZEROING:
696            if self.input.is_action_pressed("RIGHT_STICK"):
697                self.input.flush_action("RIGHT_STICK")
698                return ManualState.IDLE
699
700        return self.state_machine.KEEP_CURRENT
701    
702    def transition_state(self, from_state: ManualState, to_state: ManualState) -> None:
703        print("")
704        info(f"{from_state if from_state is not None else 'START'} -> {to_state}")
705        info(f"TENSION: {self._is_tension}, ROTATION: {self._is_rotation}")
706
707        if from_state == ManualState.IDLE:
708            pass
709
710        elif from_state == ManualState.RUNNING:
711            pass
712
713        elif from_state == ManualState.RETURNING:
714            pass
715
716        elif from_state == ManualState.ZEROING:
717            pass
718
719        info("      Y: 开关旋转")
720        info("      X: 开关张紧")
721        if to_state == ManualState.IDLE:
722            for i in range(1, 9):
723                if i == 2 or i == 3:
724                    continue
725                self.stop(i)
726            info("      START: 进入 RUNNING 状态")
727            info("      B: 进入 RETURNING 状态")
728            info("      RIGHT_STICK: 进入 ZEROING 状态")
729
730        elif to_state == ManualState.RUNNING:
731            for i in range(1, 9):
732                if i == 2 or i == 3:
733                    continue
734                self.lock(i)
735            info("      START: 返回 IDLE 状态")
736            info("      B: 进入 RETURNING 状态")
737    
738        elif to_state == ManualState.RETURNING:
739            self.lock(1)
740            self._before_returning = from_state
741            info("      B: 返回之前的状态")
742
743        elif to_state == ManualState.ZEROING:
744            for i in range(1, 9):
745                if i == 2 or i == 3:
746                    continue
747                self.lock(i)
748            info("      RIGHT_STICK: 返回 IDLE 状态")
749            info("      LEFT_SHOULDER: 切换到上一个电机")
750            info("      RIGHT_SHOULDER: 切换到下一个电机")
751            info("      A: 保存当前位置为零点")
752            info(f"      当前电机: {self._zero_index}")
753    
754    def lock(self, id: int) -> None:
755        """
756        锁定指定的电机。
757
758            :param id: 电机编号
759        """
760        if self._debug:
761            info(f"{self.name} 锁定电机 {id}")
762            return
763        self._link.update(id, HoMode.V, 2.0, 0.0, 0.0)
764
765    def lock_all(self) -> None:
766        """
767        锁定所有的电机。
768        """
769        for i in range(1, 9):
770            self.lock(i)
771
772    def stop(self, id: int) -> None:
773        """
774        停止指定的电机。
775
776            :param id: 电机编号
777        """
778        if self._debug:
779            info(f"{self.name} 停止电机 {id}")
780            return
781        self._link.update(id, HoMode.S, 0.0, 0.0, 0.0)
782
783    def stop_all(self) -> None:
784        """
785        停止所有的电机。
786        """
787        for i in range(1, 9):
788            self.stop(i)
789
790    def tension(self, on: bool, i: float=0.8) -> None:
791        """
792        驱动牵引电机,张紧导管
793
794            :param on: 是否开启牵引
795            :param i: 牵引电流(A)
796        """
797        self._is_tension = on
798        if on:
799            self._link.update(2, HoMode.V, i, -360.0, 0.0)
800            self._link.update(3, HoMode.V, i, 360.0, 0.0)
801        else:
802            self.stop(2)
803            self.stop(3)
804
805    def rotation(self, on: bool, velocity: float = 360.0) -> None:
806        """
807        驱动旋转电机,旋转换能器
808
809            :param on: 是否开启旋转
810            :param velocity: 旋转速度(度/秒)
811        """
812        self._is_rotation = on
813        if on:
814            self._link.update(8, HoMode.V, 2.0, velocity, 0.0)
815        else:
816            self.stop(8)
817
818    def turn(self, x_value: float, y_value: float, ho_state: HoState) -> None:
819        """
820        驱动转向电机,转向导管
821
822            :param x_value: 横向控制值
823            :param y_value: 纵向控制值
824            :param ho_state: Ho 机器人状态
825        """
826        if x_value == 0 and y_value == 0:
827            for i in range(4, 8):
828                self._link.update(i, HoMode.V, 2.0, 0.0, 0.0)
829        else:
830            projection = compute_vector_projection(x_value, y_value, self._base_angles)
831            control_values = [v * self._running_scale for v in projection]
832
833            for i in range(4, 8):
834                if control_values[i-4] > 0:
835                    a_id = i
836                    b_id = (i + 2) % 4 + 4
837                    a_state = ho_state.get_state(a_id)
838                    b_state = ho_state.get_state(b_id)
839                    a_near = near(a_state.p, self._zero_angles[a_id])
840                    b_near = near(b_state.p, self._zero_angles[b_id])
841
842                    if a_near and not b_near:
843                        self._link.update(b_id, HoMode.P, 2.0, control_values[i-4], self._zero_angles[b_id])
844                    elif (not a_near and b_near) or (a_near and b_near):
845                        self._link.update(a_id, HoMode.V, 2.0, control_values[i-4], 0.0)
846                    elif not a_near and not b_near:
847                        self._link.update(a_id, HoMode.V, 2.0, control_values[i-4], 0.0)
848                        self._link.update(b_id, HoMode.P, 2.0, control_values[i-4], self._zero_angles[b_id])
849
850    def move(self, l_value: float, r_value: float, ho_state: HoState) -> None:
851        """
852        驱动移动电机,移动导管
853
854            :param l_value: 左侧移动控制值
855            :param r_value: 右侧移动控制值
856            :param ho_state: Ho 机器人状态
857        """
858        if l_value != 0 and r_value != 0:
859            self._link.update(1, HoMode.V, 2.0, 0.0, 0.0)
860
861        elif l_value != 0 and r_value== 0:
862            self._link.update(1, HoMode.V, 2.0, -l_value * self._running_scale, 0.0)
863
864        elif r_value != 0 and l_value== 0:
865            self._link.update(1, HoMode.V, 2.0, r_value * self._running_scale, 0.0)
866
867        else:
868            self._link.update(1, HoMode.V, 2.0, 0.0, 0.0)
869
870    def is_running(self) -> bool:
871        """
872        判断当前节点是否处于运行状态。
873
874            :return: 如果当前节点处于运行状态,则返回 True,否则返回 False。
875        """
876        return self._valid
877
878    def enter(self) -> None:
879        """
880        进入节点。
881        """
882        self.state_machine.freeze = False
883        self.state_machine.first_tick = True
884        self.state_machine.current_state = ManualState.IDLE
885        self._is_rotation = False
886        self._is_tension = False
887        self._valid = True
888
889    def exit(self) -> None:
890        """
891        退出节点。
892        """
893        self.state_machine.freeze = True
894        self.state_machine.first_tick = True
895        self.state_machine.current_state = ManualState.IDLE
896        self._valid = False
897        self._is_rotation = False
898        self._is_tension = False
899        self.stop_all()
900
901    def _save_to_json(self, file_name, data):
902        with open(file_name, 'w') as f:
903            json.dump(data, f)
904        info(f"     {self.name} 保存 {file_name} 成功")
905
906    def _load_from_json(self, file_name):
907        if not os.path.exists(file_name):
908            warning(f"{file_name} 不存在,无法读取 zero_angles 将使用 0.0 作为初始值")
909            return None
910        with open(file_name, 'r') as f:
911            return json.load(f)
class HoMode(enum.Enum):
40class HoMode(Enum):
41    """ Ho 电机模态 """
42    S = 0
43    """ 停止 """
44    I = 1
45    """ 电流控制 """
46    V = 2
47    """ 速度控制 """
48    P = 3
49    """ 位置控制 """

Ho 电机模态

S = <HoMode.S: 0>

停止

I = <HoMode.I: 1>

电流控制

V = <HoMode.V: 2>

速度控制

P = <HoMode.P: 3>

位置控制

Inherited Members
enum.Enum
name
value
class AlignState:
51class AlignState:
52    """ 帧和时间戳对齐的状态数据 """
53    def __init__(self, id: int, i: float, v: float, p: float, frame: int, timestamp: float) -> None:
54        """ 
55        初始化对齐状态数据
56
57            :param id: 电机 id
58            :param i: 电流
59            :param v: 速度
60            :param p: 位置
61            :param frame: 当前帧
62            :param timestamp: 当前时间戳 
63        """
64        self.id = id
65        """ 电机 id """
66        self.i: float = i
67        """ 电流 """
68        self.v: float = v
69        """ 速度 """
70        self.p: float = p
71        """ 位置 """
72        self.frame = frame
73        """ 此状态数据对应的帧 """
74        self.timestamp = timestamp
75        """ 此状态数据对应的时间戳 """
76
77    def to_dict(self):
78        """ 转换为字典 """
79        return {
80            "id": self.id,
81            "i": self.i,
82            "v": self.v,
83            "p": self.p,
84            "frame": self.frame,
85            "timestamp": self.timestamp
86        }
87
88    def __repr__(self):
89        return f"AlignState(id={self.id}, i={round(self.i, 2)}, v={round(self.v, 2)}, p={round(self.p, 2)}, frame={self.frame}, timestamp={round(self.timestamp, 2)})"

帧和时间戳对齐的状态数据

AlignState(id: int, i: float, v: float, p: float, frame: int, timestamp: float)
53    def __init__(self, id: int, i: float, v: float, p: float, frame: int, timestamp: float) -> None:
54        """ 
55        初始化对齐状态数据
56
57            :param id: 电机 id
58            :param i: 电流
59            :param v: 速度
60            :param p: 位置
61            :param frame: 当前帧
62            :param timestamp: 当前时间戳 
63        """
64        self.id = id
65        """ 电机 id """
66        self.i: float = i
67        """ 电流 """
68        self.v: float = v
69        """ 速度 """
70        self.p: float = p
71        """ 位置 """
72        self.frame = frame
73        """ 此状态数据对应的帧 """
74        self.timestamp = timestamp
75        """ 此状态数据对应的时间戳 """

初始化对齐状态数据

:param id: 电机 id
:param i: 电流
:param v: 速度
:param p: 位置
:param frame: 当前帧
:param timestamp: 当前时间戳
id

电机 id

i: float

电流

v: float

速度

p: float

位置

frame

此状态数据对应的帧

timestamp

此状态数据对应的时间戳

def to_dict(self):
77    def to_dict(self):
78        """ 转换为字典 """
79        return {
80            "id": self.id,
81            "i": self.i,
82            "v": self.v,
83            "p": self.p,
84            "frame": self.frame,
85            "timestamp": self.timestamp
86        }

转换为字典

class HoState:
 91class HoState:
 92    """ Ho 机器人状态 """
 93    def __init__(self, states: List[AlignState], random_state=False) -> None:
 94        """ 
 95        初始化 Ho 机器人状态
 96
 97            :param states: 帧和时间戳对齐的状态数据列表
 98            :param random_state: 是否随机生成状态数据
 99        """
100        if not random_state:
101            self._states = states
102        else:
103            self._states = []
104            for i in range(1, 9):
105                self._states.append(AlignState(i, random.uniform(-1.0, 1.0), random.uniform(-360.0, 360.0), random.uniform(-1000.0, 1000.0), 0, 0.0))
106
107    def get_state(self, id: int) -> AlignState:
108        """ 
109        获取指定 id 的状态 
110        """
111        for state in self._states:
112            if state.id == id:
113                return state
114        return None
115    
116    def get_states(self) -> List[AlignState]:
117        """ 
118        获取所有状态 
119        """
120        return copy.deepcopy(self._states)
121    
122    def to_dict(self):
123        """ 
124        转换为字典 
125        """
126        return {
127            "states": [state.to_dict() for state in self._states]
128        }
129    
130    def __repr__(self):
131        state_str = ""
132        for state in self._states:
133            state_str += str(state)
134            if state != self._states[-1]:
135                state_str += "\n"
136        return f"HoState(\n{state_str})"

Ho 机器人状态

HoState(states: List[AlignState], random_state=False)
 93    def __init__(self, states: List[AlignState], random_state=False) -> None:
 94        """ 
 95        初始化 Ho 机器人状态
 96
 97            :param states: 帧和时间戳对齐的状态数据列表
 98            :param random_state: 是否随机生成状态数据
 99        """
100        if not random_state:
101            self._states = states
102        else:
103            self._states = []
104            for i in range(1, 9):
105                self._states.append(AlignState(i, random.uniform(-1.0, 1.0), random.uniform(-360.0, 360.0), random.uniform(-1000.0, 1000.0), 0, 0.0))

初始化 Ho 机器人状态

:param states: 帧和时间戳对齐的状态数据列表
:param random_state: 是否随机生成状态数据
def get_state(self, id: int) -> AlignState:
107    def get_state(self, id: int) -> AlignState:
108        """ 
109        获取指定 id 的状态 
110        """
111        for state in self._states:
112            if state.id == id:
113                return state
114        return None

获取指定 id 的状态

def get_states(self) -> List[AlignState]:
116    def get_states(self) -> List[AlignState]:
117        """ 
118        获取所有状态 
119        """
120        return copy.deepcopy(self._states)

获取所有状态

def to_dict(self):
122    def to_dict(self):
123        """ 
124        转换为字典 
125        """
126        return {
127            "states": [state.to_dict() for state in self._states]
128        }

转换为字典

class HoServer:
308class HoServer:
309    def __init__(self, url: str, capacity=1024, ui: bool=True, ui_frequency: float=30.0) -> None:
310        """
311        初始化 HoServer 实例。
312
313            :param url: 服务器的 URL。
314            :param capacity: 数据缓冲区的最大容量。
315            :param ui: 是否启用 UI 界面。
316            :param ui_frequency: UI 更新频率(Hz)。
317        """
318        self._url = url
319        parsed_url = urlparse(url)
320        self._host = parsed_url.hostname
321        self._port = parsed_url.port
322        self._path = parsed_url.path
323
324        self._ui = ui
325        self._ui_frequency = ui_frequency
326        self._capacity = capacity
327        self._data_buffer = []
328        """ 
329        数据缓冲区 
330        """
331
332        self._data_queue = multiprocessing.Queue()
333        self._shutdown = multiprocessing.Event()
334
335        # 启动 FastAPI 应用进程
336        self._app_process = multiprocessing.Process(target=self._run_app, args=(self._path, self._host, self._port), daemon=True)
337
338    def _update_data(self):
339        """
340        从数据队列中读取数据并更新缓冲区。
341        """
342        while not self._shutdown.is_set():
343            if not self._data_queue.empty():
344                ho_state = self._data_queue.get()
345                self._data_buffer.append(ho_state)
346                if len(self._data_buffer) > self._capacity:
347                    self._data_buffer.pop(0)
348
349    def has_data(self) -> bool:
350        """
351        检查缓冲区中是否有数据。
352
353            :return: 如果缓冲区中有数据,则返回 True,否则返回 False。
354        """
355        return len(self._data_buffer) > 0
356
357    def get_data(self) -> HoState:
358        """
359        获取缓冲区中最新的数据。
360
361            :return: 缓冲区中最新的数据,如果缓冲区为空,则返回 None。
362        """
363        if not self.has_data():
364            return None
365        return self._data_buffer.pop(-1)
366    
367    def get_data_buffer(self) -> List[HoState]:
368        """
369        获取缓冲区。
370
371        注意:若需要从数据缓冲区中读取数据,请尽快取出,否则缓冲区溢出后最开始的数据会丢失
372
373            :return: 缓冲区。
374        """
375        return copy.deepcopy(self._data_buffer)
376    
377    def length(self) -> int:
378        """
379        获取缓冲区中的数据长度。
380
381            :return: 缓冲区中的数据长度。
382        """
383        return len(self._data_buffer)
384
385    def _init_ui(self) -> None:
386        """
387        初始化 UI。
388        """
389        self.root = tk.Tk()
390        self.root.title("HoServer")
391        self.root.geometry("800x600")
392
393    def run(self) -> None:
394        """
395        启动服务器并运行 UI 更新线程(如果启用 UI)。
396        """
397        self._app_process.start()
398
399        # 数据更新线程
400        self._data_thread = threading.Thread(target=self._update_data, daemon=True)
401        self._data_thread.start()
402
403        if self._ui:
404            self._init_ui()
405            # UI 更新线程
406            self._ui_thread = threading.Thread(target=self._update_ui, daemon=True)
407            self._ui_thread.start()
408
409            self.root.mainloop()
410
411    def _run_app(self, path: str, host: str, port: int) -> None:
412        """
413        启动 FastAPI 服务器并监听请求。
414
415            :param path: API 路径。
416            :param host: 服务器主机。
417            :param port: 服务器端口。
418        """
419        app = FastAPI()
420        app.add_api_route(path, self._handle_data, methods=["POST"])
421
422        uvicorn.run(app, host=host, port=port)
423
424    async def _handle_data(self, request: Request) -> dict:
425        """
426        处理接收到的 POST 请求数据。
427
428            :param request: FastAPI 请求对象。
429            :return: 处理结果。
430        """
431        json_data = await request.json()
432        states_data = json_data.get("states", [])
433
434        states = []
435        for state_data in states_data:
436            state = AlignState(
437                id=state_data["id"],
438                i=state_data["i"],
439                v=state_data["v"],
440                p=state_data["p"],
441                frame=state_data["frame"],
442                timestamp=state_data["timestamp"]
443            )
444            states.append(state)
445        
446        ho_state = HoState(states=states)
447        self._data_queue.put(ho_state)
448        return {"message": "Data received"}
449
450    def _init_ui(self) -> None:
451        """
452        初始化 UI 界面。
453        """
454        self.root = ttkb.Window(themename="superhero", title="HoServer")
455
456        frame = ttk.Frame(self.root)
457        frame.pack(padx=10, pady=10)
458
459        columns = ['Id', 'Frame', 'Timestamp', 'i', 'v', 'p']
460        self.entries = {}
461
462        # 创建表头
463        for col, column_name in enumerate(columns):
464            label = ttk.Label(frame, text=column_name, width=5)
465            label.grid(row=0, column=col, padx=5, pady=5)
466
467        # 创建数据输入框
468        for row in range(8):
469            id_label = ttk.Label(frame, text=f"{row + 1}", width=5)
470            id_label.grid(row=row + 1, column=0, padx=5, pady=5)
471            for col in range(5):
472                entry = ttk.Entry(frame, width=15, state='normal')
473                entry.grid(row=row + 1, column=col + 1, padx=5, pady=10)
474                self.entries[(row, col)] = entry
475
476    def _update_ui(self) -> None:
477        """
478        根据数据缓冲区更新 UI 界面。
479        """
480        def update() -> None:
481            if len(self._data_buffer) == 0:
482                return
483            ho_state = self._data_buffer[-1]
484            
485            # 清空当前数据
486            for row in range(8):
487                for col in range(5):
488                    self.entries[(row, col)].delete(0, tk.END)
489
490            # 更新数据
491            for row in range(8):
492                align_state = ho_state.get_state(row + 1)
493                self.entries[(row, 0)].insert(0, str(align_state.frame))
494                self.entries[(row, 1)].insert(0, str(align_state.timestamp))
495                self.entries[(row, 2)].insert(0, str(round(align_state.i, 2)))
496                self.entries[(row, 3)].insert(0, str(round(align_state.v, 2)))
497                self.entries[(row, 4)].insert(0, str(round(align_state.p, 2)))
498
499        time_interval = 1.0 / self._ui_frequency
500        while not self._shutdown.is_set():
501            time.sleep(time_interval)
502
503            self.root.after(0, update)
504
505
506    def __del__(self) -> None:
507        """
508        清理资源,停止线程和进程。
509        """
510        self._shutdown.set()
511        self._app_process.join()
512        self._data_thread.join()
513        if self._ui:
514            self._ui_thread.join()
HoServer(url: str, capacity=1024, ui: bool = True, ui_frequency: float = 30.0)
309    def __init__(self, url: str, capacity=1024, ui: bool=True, ui_frequency: float=30.0) -> None:
310        """
311        初始化 HoServer 实例。
312
313            :param url: 服务器的 URL。
314            :param capacity: 数据缓冲区的最大容量。
315            :param ui: 是否启用 UI 界面。
316            :param ui_frequency: UI 更新频率(Hz)。
317        """
318        self._url = url
319        parsed_url = urlparse(url)
320        self._host = parsed_url.hostname
321        self._port = parsed_url.port
322        self._path = parsed_url.path
323
324        self._ui = ui
325        self._ui_frequency = ui_frequency
326        self._capacity = capacity
327        self._data_buffer = []
328        """ 
329        数据缓冲区 
330        """
331
332        self._data_queue = multiprocessing.Queue()
333        self._shutdown = multiprocessing.Event()
334
335        # 启动 FastAPI 应用进程
336        self._app_process = multiprocessing.Process(target=self._run_app, args=(self._path, self._host, self._port), daemon=True)

初始化 HoServer 实例。

:param url: 服务器的 URL。
:param capacity: 数据缓冲区的最大容量。
:param ui: 是否启用 UI 界面。
:param ui_frequency: UI 更新频率(Hz)。
def has_data(self) -> bool:
349    def has_data(self) -> bool:
350        """
351        检查缓冲区中是否有数据。
352
353            :return: 如果缓冲区中有数据,则返回 True,否则返回 False。
354        """
355        return len(self._data_buffer) > 0

检查缓冲区中是否有数据。

:return: 如果缓冲区中有数据,则返回 True,否则返回 False。
def get_data(self) -> HoState:
357    def get_data(self) -> HoState:
358        """
359        获取缓冲区中最新的数据。
360
361            :return: 缓冲区中最新的数据,如果缓冲区为空,则返回 None。
362        """
363        if not self.has_data():
364            return None
365        return self._data_buffer.pop(-1)

获取缓冲区中最新的数据。

:return: 缓冲区中最新的数据,如果缓冲区为空,则返回 None。
def get_data_buffer(self) -> List[HoState]:
367    def get_data_buffer(self) -> List[HoState]:
368        """
369        获取缓冲区。
370
371        注意:若需要从数据缓冲区中读取数据,请尽快取出,否则缓冲区溢出后最开始的数据会丢失
372
373            :return: 缓冲区。
374        """
375        return copy.deepcopy(self._data_buffer)

获取缓冲区。

注意:若需要从数据缓冲区中读取数据,请尽快取出,否则缓冲区溢出后最开始的数据会丢失

:return: 缓冲区。
def length(self) -> int:
377    def length(self) -> int:
378        """
379        获取缓冲区中的数据长度。
380
381            :return: 缓冲区中的数据长度。
382        """
383        return len(self._data_buffer)

获取缓冲区中的数据长度。

:return: 缓冲区中的数据长度。
def run(self) -> None:
393    def run(self) -> None:
394        """
395        启动服务器并运行 UI 更新线程(如果启用 UI)。
396        """
397        self._app_process.start()
398
399        # 数据更新线程
400        self._data_thread = threading.Thread(target=self._update_data, daemon=True)
401        self._data_thread.start()
402
403        if self._ui:
404            self._init_ui()
405            # UI 更新线程
406            self._ui_thread = threading.Thread(target=self._update_ui, daemon=True)
407            self._ui_thread.start()
408
409            self.root.mainloop()

启动服务器并运行 UI 更新线程(如果启用 UI)。

class ManualState(enum.Enum):
517class ManualState(Enum):
518    """ 手动状态枚举 """
519    IDLE = 0
520    """ 空闲状态,所有电机停止运动 """
521    RUNNING = 1
522    """ 运行状态 """
523    RETURNING = 2
524    """ 返回状态 """
525    ZEROING = 3
526    """ 设置零点状态 """

手动状态枚举

IDLE = <ManualState.IDLE: 0>

空闲状态,所有电机停止运动

RUNNING = <ManualState.RUNNING: 1>

运行状态

RETURNING = <ManualState.RETURNING: 2>

返回状态

ZEROING = <ManualState.ZEROING: 3>

设置零点状态

Inherited Members
enum.Enum
name
value
class HoManual(robotengine.node.Node):
528class HoManual(Node):
529    """ Ho 机器人的手动控制节点 """
530    from robotengine import InputEvent
531    def __init__(self, link: HoLink, name="Manual", rotation_velocity: float = 360.0, running_scale: float=100.0, zeroing_scale: float=100.0, axis_threshold: float=0.1) -> None:
532        """
533        初始化 HoManual 实例。
534
535            :param link: HoLink 实例。
536            :param name: 节点名称。
537            :param rotation_velocity: 旋转速度(度/秒)。
538            :param running_scale: 运行状态的缩放因子。
539            :param zeroing_scale: 设置零点状态的缩放因子。
540            :param axis_threshold: 轴的阈值。
541        """
542        from robotengine import StateMachine
543        super().__init__(name)
544        self._debug = False
545        self._valid = True
546
547        self._link = link
548        self._link.ho_state_update.connect(self._on_ho_state_update)
549        
550        self.state_machine = StateMachine(ManualState.IDLE, name="HoManualStateMachine")
551        self.add_child(self.state_machine)
552        
553        self._zero_angles = {
554            4: 0.0,
555            5: 0.0,
556            6: 0.0,
557            7: 0.0,
558        }
559        self._zero_index = 4
560
561        self._is_tension = False
562        self._is_rotation = False
563        self._rotation_velocity = rotation_velocity
564        self._base_angles = [math.pi / 4, math.pi / 4 * 3, math.pi / 4 * 5, math.pi / 4 * 7]
565
566        self._running_scale = running_scale
567        self._zeroing_scale = zeroing_scale
568        self._axis_threshold = axis_threshold
569
570        self._before_returning = None
571
572        self.exit()
573
574    def _init(self):
575        _load_zero_angles = self._load_from_json("zero_angles.json")
576        if _load_zero_angles:
577            info("成功加载 zero_angles.json")
578            for i in range(4, 8):
579                self._zero_angles[i] = _load_zero_angles[str(i)]
580                info(f"{i}: {self._zero_angles[i]}")
581
582    def _input(self, event: InputEvent) -> None:
583        if not self._valid:
584            return
585        
586        state = self.state_machine.current_state
587
588        if event.is_action_pressed("X"):
589            self.tension(not self._is_tension)
590
591        elif event.is_action_pressed("Y"):
592            self.rotation(not self._is_rotation, self._rotation_velocity)
593
594        if state == ManualState.ZEROING:
595            if event.is_action_pressed("LEFT_SHOULDER"):
596                self._change_index(-1)
597
598            elif event.is_action_pressed("RIGHT_SHOULDER"):
599                self._change_index(1)
600
601            elif event.is_action_pressed("A"):
602                if self._debug:
603                    return
604                ho_state = self._link.get_ho_state()
605                if not ho_state:
606                    return
607                for i in range(4, 8):
608                    state = ho_state.get_state(i)
609                    self._zero_angles[i] = state.p
610                self._save_to_json("zero_angles.json", self._zero_angles)
611
612    def _change_index(self, dir: int) -> None:
613        self.lock(self._zero_index)
614        self._zero_index += dir
615        if self._zero_index > 7:
616            self._zero_index = 4
617        elif self._zero_index < 4:
618            self._zero_index = 7
619        info(f"     当前电机: {self._zero_index}")
620
621    def _on_ho_state_update(self, ho_state: HoState):
622        if not self._valid:
623            return
624        
625        state = self.state_machine.current_state
626
627        if state == ManualState.IDLE:
628            pass
629
630        elif state == ManualState.RUNNING:
631            x_value = self._threshold(self.input.get_action_strength("RIGHT_X"))
632            y_value = self._threshold(self.input.get_action_strength("LEFT_Y"))
633            self.turn(x_value, y_value, ho_state)
634
635            l_value = self._threshold(self.input.get_action_strength("TRIGGER_LEFT"))
636            r_value = self._threshold(self.input.get_action_strength("TRIGGER_RIGHT"))
637            self.move(l_value, r_value, ho_state)
638
639        elif state == ManualState.RETURNING:
640            for i in range(4, 8):
641                self._link.update(i, HoMode.P, 2.0, 100.0, self._zero_angles[i])
642
643        elif state == ManualState.ZEROING:
644            direction = self.input.get_action_strength("LEFT_Y")
645            direction = self._threshold(direction)
646            velocity = direction * self._zeroing_scale
647            if not self._debug:
648                self._link.update(self._zero_index, HoMode.V, 2.0, velocity, 0.0)
649
650    def tick(self, state: ManualState, delta: float) -> None:
651        if state == ManualState.IDLE:
652            pass
653
654        elif state == ManualState.RUNNING:
655            pass
656
657        elif state == ManualState.RETURNING:
658            pass
659
660        elif state == ManualState.ZEROING:
661            pass
662
663    def _threshold(self, value: float) -> float:
664        if abs(value) < self._axis_threshold:
665            return 0.0
666        return value
667
668    def get_next_state(self, state: ManualState) -> ManualState:
669        if state == ManualState.IDLE:
670            if self.input.is_action_pressed("START"):
671                self.input.flush_action("START")
672                return ManualState.RUNNING
673            
674            if self.input.is_action_pressed("B"):
675                self.input.flush_action("B")
676                return ManualState.RETURNING
677            
678            elif self.input.is_action_pressed("RIGHT_STICK"):
679                self.input.flush_action("RIGHT_STICK")
680                return ManualState.ZEROING
681
682        elif state == ManualState.RUNNING:
683            if self.input.is_action_pressed("START"):
684                self.input.flush_action("START")
685                return ManualState.IDLE
686            
687            if self.input.is_action_pressed("B"):
688                self.input.flush_action("B")
689                return ManualState.RETURNING
690
691        elif state == ManualState.RETURNING:
692            if self.input.is_action_pressed("B"):
693                self.input.flush_action("B")
694                return self._before_returning
695
696        elif state == ManualState.ZEROING:
697            if self.input.is_action_pressed("RIGHT_STICK"):
698                self.input.flush_action("RIGHT_STICK")
699                return ManualState.IDLE
700
701        return self.state_machine.KEEP_CURRENT
702    
703    def transition_state(self, from_state: ManualState, to_state: ManualState) -> None:
704        print("")
705        info(f"{from_state if from_state is not None else 'START'} -> {to_state}")
706        info(f"TENSION: {self._is_tension}, ROTATION: {self._is_rotation}")
707
708        if from_state == ManualState.IDLE:
709            pass
710
711        elif from_state == ManualState.RUNNING:
712            pass
713
714        elif from_state == ManualState.RETURNING:
715            pass
716
717        elif from_state == ManualState.ZEROING:
718            pass
719
720        info("      Y: 开关旋转")
721        info("      X: 开关张紧")
722        if to_state == ManualState.IDLE:
723            for i in range(1, 9):
724                if i == 2 or i == 3:
725                    continue
726                self.stop(i)
727            info("      START: 进入 RUNNING 状态")
728            info("      B: 进入 RETURNING 状态")
729            info("      RIGHT_STICK: 进入 ZEROING 状态")
730
731        elif to_state == ManualState.RUNNING:
732            for i in range(1, 9):
733                if i == 2 or i == 3:
734                    continue
735                self.lock(i)
736            info("      START: 返回 IDLE 状态")
737            info("      B: 进入 RETURNING 状态")
738    
739        elif to_state == ManualState.RETURNING:
740            self.lock(1)
741            self._before_returning = from_state
742            info("      B: 返回之前的状态")
743
744        elif to_state == ManualState.ZEROING:
745            for i in range(1, 9):
746                if i == 2 or i == 3:
747                    continue
748                self.lock(i)
749            info("      RIGHT_STICK: 返回 IDLE 状态")
750            info("      LEFT_SHOULDER: 切换到上一个电机")
751            info("      RIGHT_SHOULDER: 切换到下一个电机")
752            info("      A: 保存当前位置为零点")
753            info(f"      当前电机: {self._zero_index}")
754    
755    def lock(self, id: int) -> None:
756        """
757        锁定指定的电机。
758
759            :param id: 电机编号
760        """
761        if self._debug:
762            info(f"{self.name} 锁定电机 {id}")
763            return
764        self._link.update(id, HoMode.V, 2.0, 0.0, 0.0)
765
766    def lock_all(self) -> None:
767        """
768        锁定所有的电机。
769        """
770        for i in range(1, 9):
771            self.lock(i)
772
773    def stop(self, id: int) -> None:
774        """
775        停止指定的电机。
776
777            :param id: 电机编号
778        """
779        if self._debug:
780            info(f"{self.name} 停止电机 {id}")
781            return
782        self._link.update(id, HoMode.S, 0.0, 0.0, 0.0)
783
784    def stop_all(self) -> None:
785        """
786        停止所有的电机。
787        """
788        for i in range(1, 9):
789            self.stop(i)
790
791    def tension(self, on: bool, i: float=0.8) -> None:
792        """
793        驱动牵引电机,张紧导管
794
795            :param on: 是否开启牵引
796            :param i: 牵引电流(A)
797        """
798        self._is_tension = on
799        if on:
800            self._link.update(2, HoMode.V, i, -360.0, 0.0)
801            self._link.update(3, HoMode.V, i, 360.0, 0.0)
802        else:
803            self.stop(2)
804            self.stop(3)
805
806    def rotation(self, on: bool, velocity: float = 360.0) -> None:
807        """
808        驱动旋转电机,旋转换能器
809
810            :param on: 是否开启旋转
811            :param velocity: 旋转速度(度/秒)
812        """
813        self._is_rotation = on
814        if on:
815            self._link.update(8, HoMode.V, 2.0, velocity, 0.0)
816        else:
817            self.stop(8)
818
819    def turn(self, x_value: float, y_value: float, ho_state: HoState) -> None:
820        """
821        驱动转向电机,转向导管
822
823            :param x_value: 横向控制值
824            :param y_value: 纵向控制值
825            :param ho_state: Ho 机器人状态
826        """
827        if x_value == 0 and y_value == 0:
828            for i in range(4, 8):
829                self._link.update(i, HoMode.V, 2.0, 0.0, 0.0)
830        else:
831            projection = compute_vector_projection(x_value, y_value, self._base_angles)
832            control_values = [v * self._running_scale for v in projection]
833
834            for i in range(4, 8):
835                if control_values[i-4] > 0:
836                    a_id = i
837                    b_id = (i + 2) % 4 + 4
838                    a_state = ho_state.get_state(a_id)
839                    b_state = ho_state.get_state(b_id)
840                    a_near = near(a_state.p, self._zero_angles[a_id])
841                    b_near = near(b_state.p, self._zero_angles[b_id])
842
843                    if a_near and not b_near:
844                        self._link.update(b_id, HoMode.P, 2.0, control_values[i-4], self._zero_angles[b_id])
845                    elif (not a_near and b_near) or (a_near and b_near):
846                        self._link.update(a_id, HoMode.V, 2.0, control_values[i-4], 0.0)
847                    elif not a_near and not b_near:
848                        self._link.update(a_id, HoMode.V, 2.0, control_values[i-4], 0.0)
849                        self._link.update(b_id, HoMode.P, 2.0, control_values[i-4], self._zero_angles[b_id])
850
851    def move(self, l_value: float, r_value: float, ho_state: HoState) -> None:
852        """
853        驱动移动电机,移动导管
854
855            :param l_value: 左侧移动控制值
856            :param r_value: 右侧移动控制值
857            :param ho_state: Ho 机器人状态
858        """
859        if l_value != 0 and r_value != 0:
860            self._link.update(1, HoMode.V, 2.0, 0.0, 0.0)
861
862        elif l_value != 0 and r_value== 0:
863            self._link.update(1, HoMode.V, 2.0, -l_value * self._running_scale, 0.0)
864
865        elif r_value != 0 and l_value== 0:
866            self._link.update(1, HoMode.V, 2.0, r_value * self._running_scale, 0.0)
867
868        else:
869            self._link.update(1, HoMode.V, 2.0, 0.0, 0.0)
870
871    def is_running(self) -> bool:
872        """
873        判断当前节点是否处于运行状态。
874
875            :return: 如果当前节点处于运行状态,则返回 True,否则返回 False。
876        """
877        return self._valid
878
879    def enter(self) -> None:
880        """
881        进入节点。
882        """
883        self.state_machine.freeze = False
884        self.state_machine.first_tick = True
885        self.state_machine.current_state = ManualState.IDLE
886        self._is_rotation = False
887        self._is_tension = False
888        self._valid = True
889
890    def exit(self) -> None:
891        """
892        退出节点。
893        """
894        self.state_machine.freeze = True
895        self.state_machine.first_tick = True
896        self.state_machine.current_state = ManualState.IDLE
897        self._valid = False
898        self._is_rotation = False
899        self._is_tension = False
900        self.stop_all()
901
902    def _save_to_json(self, file_name, data):
903        with open(file_name, 'w') as f:
904            json.dump(data, f)
905        info(f"     {self.name} 保存 {file_name} 成功")
906
907    def _load_from_json(self, file_name):
908        if not os.path.exists(file_name):
909            warning(f"{file_name} 不存在,无法读取 zero_angles 将使用 0.0 作为初始值")
910            return None
911        with open(file_name, 'r') as f:
912            return json.load(f)

Ho 机器人的手动控制节点

HoManual( link: HoLink, name='Manual', rotation_velocity: float = 360.0, running_scale: float = 100.0, zeroing_scale: float = 100.0, axis_threshold: float = 0.1)
531    def __init__(self, link: HoLink, name="Manual", rotation_velocity: float = 360.0, running_scale: float=100.0, zeroing_scale: float=100.0, axis_threshold: float=0.1) -> None:
532        """
533        初始化 HoManual 实例。
534
535            :param link: HoLink 实例。
536            :param name: 节点名称。
537            :param rotation_velocity: 旋转速度(度/秒)。
538            :param running_scale: 运行状态的缩放因子。
539            :param zeroing_scale: 设置零点状态的缩放因子。
540            :param axis_threshold: 轴的阈值。
541        """
542        from robotengine import StateMachine
543        super().__init__(name)
544        self._debug = False
545        self._valid = True
546
547        self._link = link
548        self._link.ho_state_update.connect(self._on_ho_state_update)
549        
550        self.state_machine = StateMachine(ManualState.IDLE, name="HoManualStateMachine")
551        self.add_child(self.state_machine)
552        
553        self._zero_angles = {
554            4: 0.0,
555            5: 0.0,
556            6: 0.0,
557            7: 0.0,
558        }
559        self._zero_index = 4
560
561        self._is_tension = False
562        self._is_rotation = False
563        self._rotation_velocity = rotation_velocity
564        self._base_angles = [math.pi / 4, math.pi / 4 * 3, math.pi / 4 * 5, math.pi / 4 * 7]
565
566        self._running_scale = running_scale
567        self._zeroing_scale = zeroing_scale
568        self._axis_threshold = axis_threshold
569
570        self._before_returning = None
571
572        self.exit()

初始化 HoManual 实例。

:param link: HoLink 实例。
:param name: 节点名称。
:param rotation_velocity: 旋转速度(度/秒)。
:param running_scale: 运行状态的缩放因子。
:param zeroing_scale: 设置零点状态的缩放因子。
:param axis_threshold: 轴的阈值。
state_machine
def tick(self, state: ManualState, delta: float) -> None:
650    def tick(self, state: ManualState, delta: float) -> None:
651        if state == ManualState.IDLE:
652            pass
653
654        elif state == ManualState.RUNNING:
655            pass
656
657        elif state == ManualState.RETURNING:
658            pass
659
660        elif state == ManualState.ZEROING:
661            pass
def get_next_state( self, state: ManualState) -> ManualState:
668    def get_next_state(self, state: ManualState) -> ManualState:
669        if state == ManualState.IDLE:
670            if self.input.is_action_pressed("START"):
671                self.input.flush_action("START")
672                return ManualState.RUNNING
673            
674            if self.input.is_action_pressed("B"):
675                self.input.flush_action("B")
676                return ManualState.RETURNING
677            
678            elif self.input.is_action_pressed("RIGHT_STICK"):
679                self.input.flush_action("RIGHT_STICK")
680                return ManualState.ZEROING
681
682        elif state == ManualState.RUNNING:
683            if self.input.is_action_pressed("START"):
684                self.input.flush_action("START")
685                return ManualState.IDLE
686            
687            if self.input.is_action_pressed("B"):
688                self.input.flush_action("B")
689                return ManualState.RETURNING
690
691        elif state == ManualState.RETURNING:
692            if self.input.is_action_pressed("B"):
693                self.input.flush_action("B")
694                return self._before_returning
695
696        elif state == ManualState.ZEROING:
697            if self.input.is_action_pressed("RIGHT_STICK"):
698                self.input.flush_action("RIGHT_STICK")
699                return ManualState.IDLE
700
701        return self.state_machine.KEEP_CURRENT
def transition_state( self, from_state: ManualState, to_state: ManualState) -> None:
703    def transition_state(self, from_state: ManualState, to_state: ManualState) -> None:
704        print("")
705        info(f"{from_state if from_state is not None else 'START'} -> {to_state}")
706        info(f"TENSION: {self._is_tension}, ROTATION: {self._is_rotation}")
707
708        if from_state == ManualState.IDLE:
709            pass
710
711        elif from_state == ManualState.RUNNING:
712            pass
713
714        elif from_state == ManualState.RETURNING:
715            pass
716
717        elif from_state == ManualState.ZEROING:
718            pass
719
720        info("      Y: 开关旋转")
721        info("      X: 开关张紧")
722        if to_state == ManualState.IDLE:
723            for i in range(1, 9):
724                if i == 2 or i == 3:
725                    continue
726                self.stop(i)
727            info("      START: 进入 RUNNING 状态")
728            info("      B: 进入 RETURNING 状态")
729            info("      RIGHT_STICK: 进入 ZEROING 状态")
730
731        elif to_state == ManualState.RUNNING:
732            for i in range(1, 9):
733                if i == 2 or i == 3:
734                    continue
735                self.lock(i)
736            info("      START: 返回 IDLE 状态")
737            info("      B: 进入 RETURNING 状态")
738    
739        elif to_state == ManualState.RETURNING:
740            self.lock(1)
741            self._before_returning = from_state
742            info("      B: 返回之前的状态")
743
744        elif to_state == ManualState.ZEROING:
745            for i in range(1, 9):
746                if i == 2 or i == 3:
747                    continue
748                self.lock(i)
749            info("      RIGHT_STICK: 返回 IDLE 状态")
750            info("      LEFT_SHOULDER: 切换到上一个电机")
751            info("      RIGHT_SHOULDER: 切换到下一个电机")
752            info("      A: 保存当前位置为零点")
753            info(f"      当前电机: {self._zero_index}")
def lock(self, id: int) -> None:
755    def lock(self, id: int) -> None:
756        """
757        锁定指定的电机。
758
759            :param id: 电机编号
760        """
761        if self._debug:
762            info(f"{self.name} 锁定电机 {id}")
763            return
764        self._link.update(id, HoMode.V, 2.0, 0.0, 0.0)

锁定指定的电机。

:param id: 电机编号
def lock_all(self) -> None:
766    def lock_all(self) -> None:
767        """
768        锁定所有的电机。
769        """
770        for i in range(1, 9):
771            self.lock(i)

锁定所有的电机。

def stop(self, id: int) -> None:
773    def stop(self, id: int) -> None:
774        """
775        停止指定的电机。
776
777            :param id: 电机编号
778        """
779        if self._debug:
780            info(f"{self.name} 停止电机 {id}")
781            return
782        self._link.update(id, HoMode.S, 0.0, 0.0, 0.0)

停止指定的电机。

:param id: 电机编号
def stop_all(self) -> None:
784    def stop_all(self) -> None:
785        """
786        停止所有的电机。
787        """
788        for i in range(1, 9):
789            self.stop(i)

停止所有的电机。

def tension(self, on: bool, i: float = 0.8) -> None:
791    def tension(self, on: bool, i: float=0.8) -> None:
792        """
793        驱动牵引电机,张紧导管
794
795            :param on: 是否开启牵引
796            :param i: 牵引电流(A)
797        """
798        self._is_tension = on
799        if on:
800            self._link.update(2, HoMode.V, i, -360.0, 0.0)
801            self._link.update(3, HoMode.V, i, 360.0, 0.0)
802        else:
803            self.stop(2)
804            self.stop(3)

驱动牵引电机,张紧导管

:param on: 是否开启牵引
:param i: 牵引电流(A)
def rotation(self, on: bool, velocity: float = 360.0) -> None:
806    def rotation(self, on: bool, velocity: float = 360.0) -> None:
807        """
808        驱动旋转电机,旋转换能器
809
810            :param on: 是否开启旋转
811            :param velocity: 旋转速度(度/秒)
812        """
813        self._is_rotation = on
814        if on:
815            self._link.update(8, HoMode.V, 2.0, velocity, 0.0)
816        else:
817            self.stop(8)

驱动旋转电机,旋转换能器

:param on: 是否开启旋转
:param velocity: 旋转速度(度/秒)
def turn( self, x_value: float, y_value: float, ho_state: HoState) -> None:
819    def turn(self, x_value: float, y_value: float, ho_state: HoState) -> None:
820        """
821        驱动转向电机,转向导管
822
823            :param x_value: 横向控制值
824            :param y_value: 纵向控制值
825            :param ho_state: Ho 机器人状态
826        """
827        if x_value == 0 and y_value == 0:
828            for i in range(4, 8):
829                self._link.update(i, HoMode.V, 2.0, 0.0, 0.0)
830        else:
831            projection = compute_vector_projection(x_value, y_value, self._base_angles)
832            control_values = [v * self._running_scale for v in projection]
833
834            for i in range(4, 8):
835                if control_values[i-4] > 0:
836                    a_id = i
837                    b_id = (i + 2) % 4 + 4
838                    a_state = ho_state.get_state(a_id)
839                    b_state = ho_state.get_state(b_id)
840                    a_near = near(a_state.p, self._zero_angles[a_id])
841                    b_near = near(b_state.p, self._zero_angles[b_id])
842
843                    if a_near and not b_near:
844                        self._link.update(b_id, HoMode.P, 2.0, control_values[i-4], self._zero_angles[b_id])
845                    elif (not a_near and b_near) or (a_near and b_near):
846                        self._link.update(a_id, HoMode.V, 2.0, control_values[i-4], 0.0)
847                    elif not a_near and not b_near:
848                        self._link.update(a_id, HoMode.V, 2.0, control_values[i-4], 0.0)
849                        self._link.update(b_id, HoMode.P, 2.0, control_values[i-4], self._zero_angles[b_id])

驱动转向电机,转向导管

:param x_value: 横向控制值
:param y_value: 纵向控制值
:param ho_state: Ho 机器人状态
def move( self, l_value: float, r_value: float, ho_state: HoState) -> None:
851    def move(self, l_value: float, r_value: float, ho_state: HoState) -> None:
852        """
853        驱动移动电机,移动导管
854
855            :param l_value: 左侧移动控制值
856            :param r_value: 右侧移动控制值
857            :param ho_state: Ho 机器人状态
858        """
859        if l_value != 0 and r_value != 0:
860            self._link.update(1, HoMode.V, 2.0, 0.0, 0.0)
861
862        elif l_value != 0 and r_value== 0:
863            self._link.update(1, HoMode.V, 2.0, -l_value * self._running_scale, 0.0)
864
865        elif r_value != 0 and l_value== 0:
866            self._link.update(1, HoMode.V, 2.0, r_value * self._running_scale, 0.0)
867
868        else:
869            self._link.update(1, HoMode.V, 2.0, 0.0, 0.0)

驱动移动电机,移动导管

:param l_value: 左侧移动控制值
:param r_value: 右侧移动控制值
:param ho_state: Ho 机器人状态
def is_running(self) -> bool:
871    def is_running(self) -> bool:
872        """
873        判断当前节点是否处于运行状态。
874
875            :return: 如果当前节点处于运行状态,则返回 True,否则返回 False。
876        """
877        return self._valid

判断当前节点是否处于运行状态。

:return: 如果当前节点处于运行状态,则返回 True,否则返回 False。
def enter(self) -> None:
879    def enter(self) -> None:
880        """
881        进入节点。
882        """
883        self.state_machine.freeze = False
884        self.state_machine.first_tick = True
885        self.state_machine.current_state = ManualState.IDLE
886        self._is_rotation = False
887        self._is_tension = False
888        self._valid = True

进入节点。

def exit(self) -> None:
890    def exit(self) -> None:
891        """
892        退出节点。
893        """
894        self.state_machine.freeze = True
895        self.state_machine.first_tick = True
896        self.state_machine.current_state = ManualState.IDLE
897        self._valid = False
898        self._is_rotation = False
899        self._is_tension = False
900        self.stop_all()

退出节点。

class HoManual.InputEvent:
139class InputEvent:
140    """ 输入事件基类 """
141    def __init__(self):
142        pass
143
144    def get_action_strength(self, action: str) -> float:
145        """ 返回某个动作的强度 """
146        pass
147
148    def is_action_pressed(self, action: str) -> bool:
149        """ 检查某个动作是否被按下 """
150        pass
151
152    def is_action_released(self, action: str) -> bool:
153        """ 检查某个动作是否被释放 """
154        pass

输入事件基类