robotengine.signal
Signal 是 robotengine 实现节点间通信和异步调用的基础。
1""" 2 3Signal 是 robotengine 实现节点间通信和异步调用的基础。 4 5""" 6 7from typing import Callable 8from robotengine.tools import warning, get_variable_name 9import threading 10 11class Signal: 12 """ 信号类 """ 13 def __init__(self, *param_types): 14 """ 15 初始化信号,需要指定信号的参数类型以保证信号触发时的类型安全,例如: 16 17 signal = Signal(int, float, str) 18 19 如果是复杂的类型,则要使用 python 库中的 typing 模块,例如: 20 21 from typing import List, Dict 22 23 signal = Signal(List[int], Dict[str, float]) 24 """ 25 self._callbacks = [] 26 self._param_types = param_types # 存储信号的预期参数类型 27 28 def connect(self, callback: Callable): 29 """ 30 连接信号,需要指定一个回调函数 31 32 :param callback: 回调函数 33 34 注意,回调函数的参数类型必须与信号的参数类型一致,否则会抛出 TypeError 异常。 35 """ 36 if callback not in self._callbacks: 37 self._callbacks.append(callback) 38 else: 39 warning(f"{callback} 已经存在,请勿重复添加") 40 41 def disconnect(self, callback: Callable): 42 """ 43 断开信号,需要指定一个回调函数 44 45 :param callback: 回调函数 46 """ 47 if callback in self._callbacks: 48 self._callbacks.remove(callback) 49 else: 50 warning(f"{callback} 不存在,请勿重复删除") 51 52 def emit(self, *args, **kwargs): 53 """ 54 触发信号,需要指定信号的参数 55 56 注意,信号触发后执行的回调函数是异步的,不会阻塞主线程。 57 """ 58 if len(args) != len(self._param_types): 59 raise TypeError(f"Expected {len(self._param_types)} arguments, but got {len(args)}") 60 61 for expected_type, actual_arg in zip(self._param_types, args): 62 if not isinstance(actual_arg, expected_type): 63 raise TypeError(f"Expected argument of type {expected_type}, but got {type(actual_arg)}") 64 65 v_name = get_variable_name(self) 66 if v_name is None: 67 thread_name = "SignalThread" 68 else: 69 thread_name = v_name + "SignalThread" 70 71 new_thread = threading.Thread(target=self._emit, args=args, kwargs=kwargs, daemon=True, name=thread_name) 72 new_thread.start() 73 74 def _emit(self, *args, **kwargs): 75 for callback in self._callbacks: 76 callback(*args, **kwargs) 77 78 def __repr__(self): 79 return f"Signal(connected callbacks={len(self._callbacks)})"
class
Signal:
12class Signal: 13 """ 信号类 """ 14 def __init__(self, *param_types): 15 """ 16 初始化信号,需要指定信号的参数类型以保证信号触发时的类型安全,例如: 17 18 signal = Signal(int, float, str) 19 20 如果是复杂的类型,则要使用 python 库中的 typing 模块,例如: 21 22 from typing import List, Dict 23 24 signal = Signal(List[int], Dict[str, float]) 25 """ 26 self._callbacks = [] 27 self._param_types = param_types # 存储信号的预期参数类型 28 29 def connect(self, callback: Callable): 30 """ 31 连接信号,需要指定一个回调函数 32 33 :param callback: 回调函数 34 35 注意,回调函数的参数类型必须与信号的参数类型一致,否则会抛出 TypeError 异常。 36 """ 37 if callback not in self._callbacks: 38 self._callbacks.append(callback) 39 else: 40 warning(f"{callback} 已经存在,请勿重复添加") 41 42 def disconnect(self, callback: Callable): 43 """ 44 断开信号,需要指定一个回调函数 45 46 :param callback: 回调函数 47 """ 48 if callback in self._callbacks: 49 self._callbacks.remove(callback) 50 else: 51 warning(f"{callback} 不存在,请勿重复删除") 52 53 def emit(self, *args, **kwargs): 54 """ 55 触发信号,需要指定信号的参数 56 57 注意,信号触发后执行的回调函数是异步的,不会阻塞主线程。 58 """ 59 if len(args) != len(self._param_types): 60 raise TypeError(f"Expected {len(self._param_types)} arguments, but got {len(args)}") 61 62 for expected_type, actual_arg in zip(self._param_types, args): 63 if not isinstance(actual_arg, expected_type): 64 raise TypeError(f"Expected argument of type {expected_type}, but got {type(actual_arg)}") 65 66 v_name = get_variable_name(self) 67 if v_name is None: 68 thread_name = "SignalThread" 69 else: 70 thread_name = v_name + "SignalThread" 71 72 new_thread = threading.Thread(target=self._emit, args=args, kwargs=kwargs, daemon=True, name=thread_name) 73 new_thread.start() 74 75 def _emit(self, *args, **kwargs): 76 for callback in self._callbacks: 77 callback(*args, **kwargs) 78 79 def __repr__(self): 80 return f"Signal(connected callbacks={len(self._callbacks)})"
信号类
Signal(*param_types)
14 def __init__(self, *param_types): 15 """ 16 初始化信号,需要指定信号的参数类型以保证信号触发时的类型安全,例如: 17 18 signal = Signal(int, float, str) 19 20 如果是复杂的类型,则要使用 python 库中的 typing 模块,例如: 21 22 from typing import List, Dict 23 24 signal = Signal(List[int], Dict[str, float]) 25 """ 26 self._callbacks = [] 27 self._param_types = param_types # 存储信号的预期参数类型
初始化信号,需要指定信号的参数类型以保证信号触发时的类型安全,例如:
signal = Signal(int, float, str)
如果是复杂的类型,则要使用 python 库中的 typing 模块,例如:
from typing import List, Dict
signal = Signal(List[int], Dict[str, float])
def
connect(self, callback: Callable):
29 def connect(self, callback: Callable): 30 """ 31 连接信号,需要指定一个回调函数 32 33 :param callback: 回调函数 34 35 注意,回调函数的参数类型必须与信号的参数类型一致,否则会抛出 TypeError 异常。 36 """ 37 if callback not in self._callbacks: 38 self._callbacks.append(callback) 39 else: 40 warning(f"{callback} 已经存在,请勿重复添加")
连接信号,需要指定一个回调函数
:param callback: 回调函数
注意,回调函数的参数类型必须与信号的参数类型一致,否则会抛出 TypeError 异常。
def
disconnect(self, callback: Callable):
42 def disconnect(self, callback: Callable): 43 """ 44 断开信号,需要指定一个回调函数 45 46 :param callback: 回调函数 47 """ 48 if callback in self._callbacks: 49 self._callbacks.remove(callback) 50 else: 51 warning(f"{callback} 不存在,请勿重复删除")
断开信号,需要指定一个回调函数
:param callback: 回调函数
def
emit(self, *args, **kwargs):
53 def emit(self, *args, **kwargs): 54 """ 55 触发信号,需要指定信号的参数 56 57 注意,信号触发后执行的回调函数是异步的,不会阻塞主线程。 58 """ 59 if len(args) != len(self._param_types): 60 raise TypeError(f"Expected {len(self._param_types)} arguments, but got {len(args)}") 61 62 for expected_type, actual_arg in zip(self._param_types, args): 63 if not isinstance(actual_arg, expected_type): 64 raise TypeError(f"Expected argument of type {expected_type}, but got {type(actual_arg)}") 65 66 v_name = get_variable_name(self) 67 if v_name is None: 68 thread_name = "SignalThread" 69 else: 70 thread_name = v_name + "SignalThread" 71 72 new_thread = threading.Thread(target=self._emit, args=args, kwargs=kwargs, daemon=True, name=thread_name) 73 new_thread.start()
触发信号,需要指定信号的参数
注意,信号触发后执行的回调函数是异步的,不会阻塞主线程。