Passed
Pull Request — master (#229)
by
unknown
03:09
created

asyncua.common.callback.CallbackService.dispatch()   A

Complexity

Conditions 5

Size

Total Lines 12
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
eloc 11
nop 3
dl 0
loc 12
rs 9.3333
c 0
b 0
f 0
1
"""
2
server side implementation of callback event 
3
"""
4
5
from collections import OrderedDict
6
from enum import Enum
7
import asyncio
8
9
10
class CallbackType(Enum):
11
    """
12
    The possible types of a Callback type.
13
14
    :ivar Null:
15
    :ivar MonitoredItem:
16
17
    """
18
    Null = 0
19
    ItemSubscriptionCreated = 1
20
    ItemSubscriptionModified = 2
21
    ItemSubscriptionDeleted = 3
22
    WritePerformed = 4
23
24
25
class Callback(object):
26
    def __init__(self):
27
        self.__name = None
28
29
    def setName(self, name):
30
        self.__name = name
31
32
    def getName(self):
33
        return self.__name
34
35
36
class ServerItemCallback(Callback):
37
    def __init__(self, request_params, response_params, user=None):
38
        self.request_params = request_params
39
        self.response_params = response_params
40
        self.user = user
41
42
43
class CallbackSubscriberInterface(object):
44
    def getSubscribedEvents(self):
45
        raise NotImplementedError()
46
47
48
class CallbackService(object):
49
    def __init__(self):
50
        self._listeners = {}
51
52
    async def dispatch(self, eventName, event=None):
53
        if event is None:
54
            event = Callback()
55
        elif not isinstance(event, Callback):
56
            raise ValueError('Unexpected event type given')
57
        event.setName(eventName)
58
        if eventName not in self._listeners:
59
            return event
60
        for listener in self._listeners[eventName].values():
61
            await self.call_listener(event, listener)
62
63
        return event
64
65
    async def call_listener(self, event, listener):
66
        if asyncio.iscoroutinefunction(listener):
67
            await listener(event, self)
68
        else:
69
            listener(event, self)
70
71
    def addListener(self, eventName, listener, priority=0):
72
        if eventName not in self._listeners:
73
            self._listeners[eventName] = {}
74
        self._listeners[eventName][priority] = listener
75
        self._listeners[eventName] = OrderedDict(sorted(self._listeners[eventName].items(), key=lambda item: item[0]))
76
77
    def removeListener(self, eventName, listener=None):
78
        if eventName not in self._listeners:
79
            return
80
        if not listener:
81
            del self._listeners[eventName]
82
        else:
83
            for p, l in self._listeners[eventName].items():
84
                if l is listener:
85
                    self._listeners[eventName].pop(p)
86
                    return
87
88
    def addSubscriber(self, subscriber):
89
        if not isinstance(subscriber, CallbackSubscriberInterface):
90
            raise ValueError('Unexpected subscriber type given')
91
        for eventName, params in subscriber.getSubscribedEvents().items():
92
            if isinstance(params, str):
93
                self.addListener(eventName, getattr(subscriber, params))
94
            elif isinstance(params, list):
95
                if not params:
96
                    raise ValueError(f'Invalid params "{repr(params)}" for event "{str(eventName)}"')
97
                if len(params) <= 2 and isinstance(params[0], str):
98
                    priority = params[1] if len(params) > 1 else 0
99
                    self.addListener(eventName, getattr(subscriber, params[0]), priority)
100
                else:
101
                    for listener in params:
102
                        priority = listener[1] if len(listener) > 1 else 0
103
                        self.addListener(eventName, getattr(subscriber, listener[0]), priority)
104
            else:
105
                raise ValueError(f'Invalid params for event "{str(eventName)}"')
106