Source code for hbutils.design.observer

"""
Overview:
    Implementation of observer pattern.
    See `Observer Pattern - Wikipedia <https://en.wikipedia.org/wiki/Observer_pattern>`_.
"""
from enum import Enum
from typing import Union, Type, TypeVar, Dict, Callable, Tuple, Any, List

__all__ = [
    'Observable'
]

_EventSetType = Union[Type[Enum], list, tuple]


def _auto_members(events: _EventSetType):
    if isinstance(events, type) and issubclass(events, Enum):
        return list(events.__members__.values())
    elif isinstance(events, (list, tuple)):
        return list(events)
    else:
        raise TypeError(f'Invalid event set - {repr(events)}.')


def _get_object_id(obj) -> Tuple[str, int]:
    try:
        return 'hash', hash(obj)
    except TypeError:
        return 'id', id(obj)


_EventType = TypeVar('_EventType')
_SubscriberType = TypeVar('_SubscriberType')
_CallbackType = TypeVar('_CallbackType', bound=Callable[..., Any])


class _CallbackWrapper:
    def __init__(self, callback):
        from ..reflection import dynamic_call, sigsupply
        self.raw = callback
        self._callback = dynamic_call(sigsupply(callback, lambda x: None))

    def __call__(self, *args, **kwargs):
        return self._callback(*args, **kwargs)


[docs]class Observable: """ Overview: Observable object. * :meth:`subscribe` can be used for subscribing on specific event. * :meth:`unsubscribe` can be used for unsubscribing from specific event. * :meth:`dispatch` can be used for broadcasting a specific event, and all the subscribed callback will be \ triggered. Examples:: >>> from enum import IntEnum, unique >>> from hbutils.design import Observable >>> >>> @unique ... class MyIntEnum(IntEnum): ... A = 1 ... B = 2 >>> >>> o = Observable(MyIntEnum) >>> list_a, list_b = [], [] >>> o.subscribe(MyIntEnum.A, list_a, 'append') # use list_a.append >>> o.subscribe(MyIntEnum.B, list_a, lambda v: list_a.append(v)) # custom function. >>> o.subscribe(MyIntEnum.A, list_b, 'append') # use list_b.append >>> >>> list_a, list_b ([], []) >>> o.dispatch(MyIntEnum.A) >>> list_a, list_b ([<MyIntEnum.A: 1>], [<MyIntEnum.A: 1>]) >>> o.dispatch(MyIntEnum.B) >>> list_a, list_b ([<MyIntEnum.A: 1>, <MyIntEnum.B: 2>], [<MyIntEnum.A: 1>]) >>> >>> o.unsubscribe(MyIntEnum.A, list_a) >>> o.dispatch(MyIntEnum.A) >>> list_a, list_b ([<MyIntEnum.A: 1>, <MyIntEnum.B: 2>], [<MyIntEnum.A: 1>, <MyIntEnum.A: 1>]) >>> o.dispatch(MyIntEnum.B) >>> list_a, list_b ([<MyIntEnum.A: 1>, <MyIntEnum.B: 2>, <MyIntEnum.B: 2>], [<MyIntEnum.A: 1>, <MyIntEnum.A: 1>]) """
[docs] def __init__(self, events: _EventSetType): """ Constructor of :class:`Observable`. :param events: Set of events, can be a list, tuple or an enum class. .. note:: When enum is used, its values will be used as events. For example: >>> from enum import IntEnum >>> from hbutils.design import Observable >>> >>> class MyIntEnum(IntEnum): ... A = 1 ... B = 2 >>> >>> # equals to `Observable([MyIntEnum.A, MyIntEnum.B])` ... o = Observable(MyIntEnum) >>> o._events # just for explanation, do not do this on actual use {<MyIntEnum.A: 1>: {}, <MyIntEnum.B: 2>: {}} """ events = _auto_members(events) self._observers: Dict[Tuple[str, int], _SubscriberType] = {} self._events: Dict[_EventType, Dict[Tuple[str, int], _CallbackWrapper]] = {e: {} for e in events}
[docs] def subscribers(self, event: _EventType) -> List[_SubscriberType]: """ Get subscribers of the given ``event``. :param event: Event for querying. :return: A list of subscribers. """ return [self._observers[id_] for id_ in self._get_subscriptions(event).keys()]
[docs] def subscriptions(self, event: _EventType) -> List[Tuple[_SubscriberType, _CallbackType]]: """ Get subscriptions of the given ``event``. :param event: Event for querying. :return: A list of tuples with subscribers and their callbacks. """ return [ (self._observers[id_], callback.raw) for id_, callback in self._get_subscriptions(event).items() ]
def _get_subscriptions(self, event: _EventType) -> Dict[Tuple[str, int], _CallbackWrapper]: return self._events[event] def _put_subscription(self, event: _EventType, subscriber: _SubscriberType, callback: _CallbackType): subscriber_id = _get_object_id(subscriber) self._get_subscriptions(event)[subscriber_id] = _CallbackWrapper(callback) self._observers[subscriber_id] = subscriber def _del_subscription(self, event: _EventType, subscriber: _SubscriberType): try: del self._get_subscriptions(event)[_get_object_id(subscriber)] except KeyError: raise KeyError(subscriber)
[docs] def subscribe(self, event: _EventType, subscriber: _SubscriberType, callback: Union[_CallbackType, str, None] = None): """ Subscribe to the given ``event``. :param event: Event to be subscribed. :param subscriber: Subscriber of this subscription. :param callback: Callback function. If ``str`` is given, method with this name on ``subscriber`` will be used. \ Default is ``None`` which means the ``update`` method on ``subscriber`` will be used. .. note:: Callback function should have no more than 2 positional arguments. For example: >>> o.subscribe(MyIntEnum.A, 'user1', lambda: 2) # ok >>> o.subscribe(MyIntEnum.A, 'user2', lambda event: 2) # ok >>> o.subscribe(MyIntEnum.A, 'user3', lambda event, obs: 2) # ok >>> o.subscribe(MyIntEnum.A, 'user4', lambda x, y, z: 2) # X """ if callback is None: callback = getattr(subscriber, 'update') elif isinstance(callback, str): callback = getattr(subscriber, callback) if not callable(callback): raise TypeError(f'Callback should be callable, but {repr(callback)} found.') self._put_subscription(event, subscriber, callback)
[docs] def unsubscribe(self, event: _EventType, subscriber: _SubscriberType): """ Unsubscribe from the given ``event``. :param event: Event to be unsubscribed. :param subscriber: Subscriber of this unsubscription. """ self._del_subscription(event, subscriber)
[docs] def dispatch(self, event: _EventType): """ Dispatch event. :param event: Event to be dispatched. """ for _, callback in self._get_subscriptions(event).items(): callback(event, self)