Test Failed
Pull Request — master (#229)
by
unknown
04:23
created

CallbackDispatcher.dispatch()   B

Complexity

Conditions 6

Size

Total Lines 15
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 6
eloc 13
nop 3
dl 0
loc 15
rs 8.6666
c 0
b 0
f 0
1
"""
2
server side implementation of callback event 
3
"""
4
5
from collections import OrderedDict
6
from enum import Enum
7
import asyncio
8
9
10
class CallbackType(Enum):
11
    """
12
    The possible types of a Callback type.
13
14
    :ivar Null:
15
    :ivar MonitoredItem:
16
17
    """
18
    Null = 0
19
    ItemSubscriptionCreated = 1
20
    ItemSubscriptionModified = 2
21
    ItemSubscriptionDeleted = 3
22
    WritePerformed = 4
23
24
25
class Callback(object):
26
    def __init__(self):
27
        self.__name = None
28
29
    def setName(self, name):
30
        self.__name = name
31
32
    def getName(self):
33
        return self.__name
34
35
36
class ServerItemCallback(Callback):
37
    def __init__(self, request_params, response_params, user=None):
38
        self.request_params = request_params
39
        self.response_params = response_params
40
        self.user = user
41
42
43
class CallbackSubscriberInterface(object):
44
    def getSubscribedEvents(self):
45
        raise NotImplementedError()
46
47
48
class CallbackDispatcher(object):
49
    def __init__(self):
50
        self._listeners = {}
51
52
    async def dispatch(self, eventName, event=None):
53
        if event is None:
54
            event = Callback()
55
        elif not isinstance(event, Callback):
56
            raise ValueError('Unexpected event type given')
57
        event.setName(eventName)
58
        if eventName not in self._listeners:
59
            return event
60
        for listener in self._listeners[eventName].values():
61
            if asyncio.iscoroutinefunction(listener):
62
                await listener(event, self)
63
            else:
64
                listener(event, self)
65
66
        return event
67
68
    def addListener(self, eventName, listener, priority=0):
69
        if eventName not in self._listeners:
70
            self._listeners[eventName] = {}
71
        self._listeners[eventName][priority] = listener
72
        self._listeners[eventName] = OrderedDict(sorted(self._listeners[eventName].items(), key=lambda item: item[0]))
73
74
    def removeListener(self, eventName, listener=None):
75
        if eventName not in self._listeners:
76
            return
77
        if not listener:
78
            del self._listeners[eventName]
79
        else:
80
            for p, l in self._listeners[eventName].items():
81
                if l is listener:
82
                    self._listeners[eventName].pop(p)
83
                    return
84
85
    def addSubscriber(self, subscriber):
86
        if not isinstance(subscriber, CallbackSubscriberInterface):
87
            raise ValueError('Unexpected subscriber type given')
88
        for eventName, params in subscriber.getSubscribedEvents().items():
89
            if isinstance(params, str):
90
                self.addListener(eventName, getattr(subscriber, params))
91
            elif isinstance(params, list):
92
                if not params:
93
                    raise ValueError(f'Invalid params "{repr(params)}" for event "{str(eventName)}"')
94
                if len(params) <= 2 and isinstance(params[0], str):
95
                    priority = params[1] if len(params) > 1 else 0
96
                    self.addListener(eventName, getattr(subscriber, params[0]), priority)
97
                else:
98
                    for listener in params:
99
                        priority = listener[1] if len(listener) > 1 else 0
100
                        self.addListener(eventName, getattr(subscriber, listener[0]), priority)
101
            else:
102
                raise ValueError(f'Invalid params for event "{str(eventName)}"')
103