GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.

MessageController.receive_message()   B
last analyzed

Complexity

Conditions 5

Size

Total Lines 22

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 30

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 5
c 1
b 0
f 0
dl 0
loc 22
ccs 0
cts 10
cp 0
crap 30
rs 8.3411
1
"""
2
Enarksh
3
4
Copyright 2013-2016 Set Based IT Consultancy
5
6
Licence MIT
7
"""
8
import pickle
9
from time import sleep
10
11
import zmq
12
from zmq.utils import jsonapi
13
14
from enarksh.event.Event import Event
15
from enarksh.event.EventActor import EventActor
16
from enarksh.message.Message import Message
17
18
19
class MessageController(EventActor):
20
    """
21
    A message controller for receiving messages and firing the appropriate events.
22
    """
23
24
    _json_message_creators = {}
25
    """
26
    All registered message creators from JSON data.
27
28
    :type: dict[str,callable]
29
    """
30
31
    # ------------------------------------------------------------------------------------------------------------------
32
    def __init__(self):
33
        """
34
        Object constructor.
35
        """
36
        EventActor.__init__(self)
37
38
        Message.message_controller = self
39
40
        self.__zmq_context = zmq.Context()
41
        """
42
        The ZMQ context.
43
44
        :type: None|zmq.Context
45
        """
46
47
        self.__message_types = {}
48
        """
49
        All registered message types. A dict from message type to the event that must fire when a message of
50
        that message type has been received.
51
52
        :type: dict[str,enarksh.event.Event.Event]
53
        """
54
55
        self.__end_points = {}
56
        """
57
        All registered end points.
58
59
        :type: dict[str,zmq.sugar.socket.Socket]
60
        """
61
62
    # ------------------------------------------------------------------------------------------------------------------
63
    @property
64
    def end_points(self):
65
        """
66
        Returns all end points.
67
68
        :rtype: dict[str,zmq.sugar.socket.Socket]
69
        """
70
        return self.__end_points
71
72
    # ------------------------------------------------------------------------------------------------------------------
73
    def register_end_point(self, name, socket_type, end_point):
74
        """
75
        Registers an end point.
76
77
        :param str name: The name of the end point.
78
        :param int socket_type: The socket type, one of
79
                                - zmq.PULL for asynchronous incoming messages
80
                                - zmq.REP for lockstep incoming messages
81
                                - zmq.PUSH for asynchronous outgoing messages
82
        :param str end_point: The end point.
83
        """
84
        socket = self.__zmq_context.socket(socket_type)
85
        self.__end_points[name] = socket
86
        if socket_type in [zmq.PULL, zmq.REP]:
87
            socket.bind(end_point)
88
        elif socket_type == zmq.PUSH:
89
            socket.connect(end_point)
90
        else:
91
            raise ValueError("Unknown socket type {0}".format(socket_type))
92
93
    # ------------------------------------------------------------------------------------------------------------------
94
    @staticmethod
95
    def register_json_message_creator(message_type, creator):
96
        """
97
        Registers a function that creates a message from JSOn encode data.
98
99
        :param str message_type: The message type to register.
100
        :param callable creator: The function for creating a message from JSON encode data.
101
        """
102
        MessageController._json_message_creators[message_type] = creator
103
104
    # ------------------------------------------------------------------------------------------------------------------
105
    def register_message_type(self, message_type):
106
        """
107
        Registers a message type together with the event and message constructor.
108
109
        :param str message_type: The message type to register.
110
        """
111
        self.__message_types[message_type] = Event(self)
112
113
    # ------------------------------------------------------------------------------------------------------------------
114
    def get_event_by_message_type(self, message_type):
115
        """
116
        Returns the event that will be fired when a message a certain message type is been received.
117
118
        :param str message_type: The messaged type
119
120
        :rtype: enarksh.event.Event.Event
121
        """
122
        if message_type not in self.__message_types:
123
            raise ValueError("Unknown message type '{0}'".format(message_type))
124
125
        return self.__message_types[message_type]
126
127
    # ------------------------------------------------------------------------------------------------------------------
128
    def register_listener(self, message_type, listener, listener_data=None):
129
        """
130
        Registers an object as a listener for the event fired when a message has been received.
131
132
        :param str message_type: The message type.
133
        :param callable listener: An object that listen for an event.
134
        :param * listener_data: Additional data supplied by the listener destination.
135
        """
136
        self.get_event_by_message_type(message_type).register_listener(listener, listener_data)
137
138
    # ------------------------------------------------------------------------------------------------------------------
139
    def _receive_message(self, name, socket):
140
        """
141
        Receives an incoming message from a ZMQ socket.
142
143
        :param str name: The name of the end point of source of the message.
144
        :param zmq.sugar.socket.Socket socket: The ZMQ socket.
145
        """
146
        buffer = socket.recv()
147
        if buffer[:1] == b'{':
148
            tmp = jsonapi.loads(buffer)
149
            if tmp['type'] not in self._json_message_creators:
150
                raise ValueError("Received JSON message with unknown message type '{0}'".format(tmp['type']))
151
            message = self._json_message_creators[tmp['type']](tmp)
152
        else:
153
            message = pickle.loads(buffer)
154
            """:type: enarksh.message.Message.Message"""
155
            message.message_source = name
156
157
        if message.message_type not in self.__message_types:
158
            raise ValueError("Received message with unknown message type '{0}'".format(message.message_type))
159
160
        event = self.__message_types[message.message_type]
161
        event.fire(message)
162
163
    # ------------------------------------------------------------------------------------------------------------------
164
    def receive_message(self, event, event_data, listener_data):
165
        """
166
        Receives a messages from another processes.
167
168
        :param * event: Not used.
169
        :param * event_data: Not used.
170
        :param * listener_data: Not used.
171
        """
172
        del event, event_data, listener_data
173
174
        # Make a poller for all incoming sockets.
175
        poller = zmq.Poller()
176
        for socket in self.__end_points.values():
177
            if socket.type in [zmq.PULL, zmq.REP]:
178
                poller.register(socket, zmq.POLLIN)
179
180
        # Wait for socket is ready for reading.
181
        socks = dict(poller.poll())
182
183
        for name, socket in self.__end_points.items():
184
            if socket in socks:
185
                self._receive_message(name, socket)
186
187
    # ------------------------------------------------------------------------------------------------------------------
188
    def send_message(self, end_point, message, json=False):
189
        """
190
        Sends a message to an end point.
191
192
        :param str end_point: The name of the end point.
193
        :param enarksh.message.Message.Message|* message: The message.
194
        :param bool json: If True the message will be send as a JSON encode string.
195
        """
196
        socket = self.__end_points[end_point]
197
        if json:
198
            socket.send_json(message)
199
        else:
200
            socket.send_pyobj(message)
201
202
    # ------------------------------------------------------------------------------------------------------------------
203
    def no_barking(self, seconds):
204
        """
205
        During start up of ZMQ the incoming file descriptors become 'ready for reading' while there is no message on
206
        the socket. This method prevent incoming sockets barking that the are ready the for reading.
207
208
        :param int seconds: The number of seconds the give the other ZMQ thread to start up.
209
        """
210
        sleep(seconds)
211
212
        for _ in range(1, len(self.end_points)):
213
            poller = zmq.Poller()
214
            for socket in self.end_points.values():
215
                if socket.type in [zmq.PULL, zmq.REP]:
216
                    poller.register(socket, zmq.POLLIN)
217
218
            poller.poll(1)
219
220
# ----------------------------------------------------------------------------------------------------------------------
221