robotengine.signal

Signal 是 robotengine 实现节点间通信和异步调用的基础。

 1"""
 2
 3Signal 是 robotengine 实现节点间通信和异步调用的基础。
 4
 5"""
 6
 7from typing import Callable
 8from robotengine.tools import warning, get_variable_name
 9import threading
10
11class Signal:
12    """ 信号类 """
13    def __init__(self, *param_types):
14        """ 
15        初始化信号,需要指定信号的参数类型以保证信号触发时的类型安全,例如:
16
17            signal = Signal(int, float, str)
18
19        如果是复杂的类型,则要使用 python 库中的 typing 模块,例如:
20
21            from typing import List, Dict
22
23            signal = Signal(List[int], Dict[str, float])
24        """
25        self._callbacks = []
26        self._param_types = param_types  # 存储信号的预期参数类型
27
28    def connect(self, callback: Callable):
29        """ 
30        连接信号,需要指定一个回调函数 
31
32            :param callback: 回调函数
33
34        注意,回调函数的参数类型必须与信号的参数类型一致,否则会抛出 TypeError 异常。
35        """
36        if callback not in self._callbacks:
37            self._callbacks.append(callback)
38        else:
39            warning(f"{callback} 已经存在,请勿重复添加")
40
41    def disconnect(self, callback: Callable):
42        """ 
43        断开信号,需要指定一个回调函数 
44        
45            :param callback: 回调函数
46        """
47        if callback in self._callbacks:
48            self._callbacks.remove(callback)
49        else:
50            warning(f"{callback} 不存在,请勿重复删除")
51
52    def emit(self, *args, **kwargs):
53        """ 
54        触发信号,需要指定信号的参数 
55
56        注意,信号触发后执行的回调函数是异步的,不会阻塞主线程。
57        """
58        if len(args) != len(self._param_types):
59            raise TypeError(f"Expected {len(self._param_types)} arguments, but got {len(args)}")
60
61        for expected_type, actual_arg in zip(self._param_types, args):
62            if not isinstance(actual_arg, expected_type):
63                raise TypeError(f"Expected argument of type {expected_type}, but got {type(actual_arg)}")
64        
65        v_name = get_variable_name(self)
66        if v_name is None:
67            thread_name = "SignalThread"
68        else:
69            thread_name = v_name + "SignalThread"
70
71        new_thread = threading.Thread(target=self._emit, args=args, kwargs=kwargs, daemon=True, name=thread_name)
72        new_thread.start()
73    
74    def _emit(self, *args, **kwargs):
75        for callback in self._callbacks:
76            callback(*args, **kwargs)
77
78    def __repr__(self):
79        return f"Signal(connected callbacks={len(self._callbacks)})"
class Signal:
12class Signal:
13    """ 信号类 """
14    def __init__(self, *param_types):
15        """ 
16        初始化信号,需要指定信号的参数类型以保证信号触发时的类型安全,例如:
17
18            signal = Signal(int, float, str)
19
20        如果是复杂的类型,则要使用 python 库中的 typing 模块,例如:
21
22            from typing import List, Dict
23
24            signal = Signal(List[int], Dict[str, float])
25        """
26        self._callbacks = []
27        self._param_types = param_types  # 存储信号的预期参数类型
28
29    def connect(self, callback: Callable):
30        """ 
31        连接信号,需要指定一个回调函数 
32
33            :param callback: 回调函数
34
35        注意,回调函数的参数类型必须与信号的参数类型一致,否则会抛出 TypeError 异常。
36        """
37        if callback not in self._callbacks:
38            self._callbacks.append(callback)
39        else:
40            warning(f"{callback} 已经存在,请勿重复添加")
41
42    def disconnect(self, callback: Callable):
43        """ 
44        断开信号,需要指定一个回调函数 
45        
46            :param callback: 回调函数
47        """
48        if callback in self._callbacks:
49            self._callbacks.remove(callback)
50        else:
51            warning(f"{callback} 不存在,请勿重复删除")
52
53    def emit(self, *args, **kwargs):
54        """ 
55        触发信号,需要指定信号的参数 
56
57        注意,信号触发后执行的回调函数是异步的,不会阻塞主线程。
58        """
59        if len(args) != len(self._param_types):
60            raise TypeError(f"Expected {len(self._param_types)} arguments, but got {len(args)}")
61
62        for expected_type, actual_arg in zip(self._param_types, args):
63            if not isinstance(actual_arg, expected_type):
64                raise TypeError(f"Expected argument of type {expected_type}, but got {type(actual_arg)}")
65        
66        v_name = get_variable_name(self)
67        if v_name is None:
68            thread_name = "SignalThread"
69        else:
70            thread_name = v_name + "SignalThread"
71
72        new_thread = threading.Thread(target=self._emit, args=args, kwargs=kwargs, daemon=True, name=thread_name)
73        new_thread.start()
74    
75    def _emit(self, *args, **kwargs):
76        for callback in self._callbacks:
77            callback(*args, **kwargs)
78
79    def __repr__(self):
80        return f"Signal(connected callbacks={len(self._callbacks)})"

信号类

Signal(*param_types)
14    def __init__(self, *param_types):
15        """ 
16        初始化信号,需要指定信号的参数类型以保证信号触发时的类型安全,例如:
17
18            signal = Signal(int, float, str)
19
20        如果是复杂的类型,则要使用 python 库中的 typing 模块,例如:
21
22            from typing import List, Dict
23
24            signal = Signal(List[int], Dict[str, float])
25        """
26        self._callbacks = []
27        self._param_types = param_types  # 存储信号的预期参数类型

初始化信号,需要指定信号的参数类型以保证信号触发时的类型安全,例如:

signal = Signal(int, float, str)

如果是复杂的类型,则要使用 python 库中的 typing 模块,例如:

from typing import List, Dict

signal = Signal(List[int], Dict[str, float])
def connect(self, callback: Callable):
29    def connect(self, callback: Callable):
30        """ 
31        连接信号,需要指定一个回调函数 
32
33            :param callback: 回调函数
34
35        注意,回调函数的参数类型必须与信号的参数类型一致,否则会抛出 TypeError 异常。
36        """
37        if callback not in self._callbacks:
38            self._callbacks.append(callback)
39        else:
40            warning(f"{callback} 已经存在,请勿重复添加")

连接信号,需要指定一个回调函数

:param callback: 回调函数

注意,回调函数的参数类型必须与信号的参数类型一致,否则会抛出 TypeError 异常。

def disconnect(self, callback: Callable):
42    def disconnect(self, callback: Callable):
43        """ 
44        断开信号,需要指定一个回调函数 
45        
46            :param callback: 回调函数
47        """
48        if callback in self._callbacks:
49            self._callbacks.remove(callback)
50        else:
51            warning(f"{callback} 不存在,请勿重复删除")

断开信号,需要指定一个回调函数

:param callback: 回调函数
def emit(self, *args, **kwargs):
53    def emit(self, *args, **kwargs):
54        """ 
55        触发信号,需要指定信号的参数 
56
57        注意,信号触发后执行的回调函数是异步的,不会阻塞主线程。
58        """
59        if len(args) != len(self._param_types):
60            raise TypeError(f"Expected {len(self._param_types)} arguments, but got {len(args)}")
61
62        for expected_type, actual_arg in zip(self._param_types, args):
63            if not isinstance(actual_arg, expected_type):
64                raise TypeError(f"Expected argument of type {expected_type}, but got {type(actual_arg)}")
65        
66        v_name = get_variable_name(self)
67        if v_name is None:
68            thread_name = "SignalThread"
69        else:
70            thread_name = v_name + "SignalThread"
71
72        new_thread = threading.Thread(target=self._emit, args=args, kwargs=kwargs, daemon=True, name=thread_name)
73        new_thread.start()

触发信号,需要指定信号的参数

注意,信号触发后执行的回调函数是异步的,不会阻塞主线程。