GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Passed
Push — master ( 5e962d...d4663b )
by P.R.
03:14
created

MessageController.destroy()   A

Complexity

Conditions 1

Size

Total Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
c 1
b 0
f 0
dl 0
loc 8
rs 9.4285
1
"""
2
Enarksh
3
4
Copyright 2013-2016 Set Based IT Consultancy
5
6
Licence MIT
7
"""
8
from time import sleep
9
10
import zmq
11
12
from enarksh.event.Event import Event
13
from enarksh.event.EventActor import EventActor
14
from enarksh.message.Message import Message
15
16
17
class MessageController(EventActor):
18
    """
19
    A message controller for receiving messages and firing the appropriate events.
20
    """
21
22
    # ------------------------------------------------------------------------------------------------------------------
23
    def __init__(self):
24
        """
25
        Object constructor.
26
        """
27
        EventActor.__init__(self)
28
29
        Message.message_controller = self
30
31
        self._zmq_context = zmq.Context()
32
        """
33
        The ZMQ context.
34
35
        :type: None|zmq.Context
36
        """
37
38
        self._message_types = {}
39
        """
40
        All registered message types. A dict from message type to the event that must fire when a message of
41
        that message type has been received.
42
43
        :type: dict[str,enarksh.event.Event.Event]
44
        """
45
46
        self._end_points = {}
47
        """
48
        All registered end points.
49
50
        :type: dict[str,zmq.sugar.socket.Socket]
51
        """
52
53
    # ------------------------------------------------------------------------------------------------------------------
54
    @property
55
    def end_points(self):
56
        """
57
        Returns all end points.
58
59
        :rtype: dict[str,zmq.sugar.socket.Socket]
60
        """
61
        return self._end_points
62
63
    # ------------------------------------------------------------------------------------------------------------------
64
    def register_end_point(self, name, socket_type, end_point):
65
        """
66
        Registers an end point.
67
68
        :param str name: The name of the end point.
69
        :param int socket_type: The socket type, one of
70
                                - zmq.PULL for asynchronous incoming messages
71
                                - zmq.REP for lockstep incoming messages
72
                                - zmq.PUSH for asynchronous outgoing messages
73
        :param str end_point: The end point.
74
        """
75
        socket = self._zmq_context.socket(socket_type)
76
        self._end_points[name] = socket
77
        if socket_type in [zmq.PULL, zmq.REP]:
78
            socket.bind(end_point)
79
        elif socket_type == zmq.PUSH:
80
            socket.connect(end_point)
81
        else:
82
            raise ValueError("Unknown socket type {0}".format(socket_type))
83
84
    # ------------------------------------------------------------------------------------------------------------------
85
    def register_message_type(self, message_type):
86
        """
87
        Registers a message type together with the event and message constructor.
88
89
        :param str message_type: The message type to register.
90
        """
91
        self._message_types[message_type] = Event(self)
92
93
    # ------------------------------------------------------------------------------------------------------------------
94
    def get_event_by_message_type(self, message_type):
95
        """
96
        Returns the event that will be fired when a message a certain message type is been received.
97
98
        :param str message_type: The messaged type
99
100
        :rtype: enarksh.event.Event.Event
101
        """
102
        if message_type not in self._message_types:
103
            raise ValueError("Unknown message type '{0}'".format(message_type))
104
105
        return self._message_types[message_type]
106
107
    # ------------------------------------------------------------------------------------------------------------------
108
    def register_listener(self, message_type, listener, listener_data=None):
109
        """
110
        Registers an object as a listener for the event fired when a message has been received.
111
112
        :param str message_type: The message type.
113
        :param callable listener: An object that listen for an event.
114
        :param * listener_data: Additional data supplied by the listener destination.
115
        """
116
        self.get_event_by_message_type(message_type).register_listener(listener, listener_data)
117
118
    # ------------------------------------------------------------------------------------------------------------------
119
    def _receive_message(self, name, socket):
120
        """
121
        Receives an incoming message from a ZMQ socket.
122
123
        :param str name: The name of the end point of source of the message.
124
        :param zmq.sugar.socket.Socket socket: The ZMQ socket.
125
126
        XXX @todo Support JSON messages for communication with PHP front end.
127
        """
128
        message = socket.recv_pyobj()
129
        """:type: enarksh.message.Message.Message"""
130
        message.message_source = name
131
132
        if message.message_type not in self._message_types:
133
            raise ValueError("Received message with unknown message type '{0}'".format(message.message_type))
134
135
        event = self._message_types[message.message_type]
136
        event.fire(message)
137
138
    # ------------------------------------------------------------------------------------------------------------------
139
    def receive_message(self, event, event_data, listener_data):
140
        """
141
        Receives a messages from another processes.
142
143
        :param * event: Not used.
144
        :param * event_data: Not used.
145
        :param * listener_data: Not used.
146
        """
147
        del event, event_data, listener_data
148
149
        # Make a poller for all incoming sockets.
150
        poller = zmq.Poller()
151
        for socket in self._end_points.values():
152
            if socket.type in [zmq.PULL, zmq.REP]:
153
                poller.register(socket, zmq.POLLIN)
154
155
        # Wait for socket is ready for reading.
156
        socks = dict(poller.poll())
157
158
        for name, socket in self._end_points.items():
159
            if socket in socks:
160
                self._receive_message(name, socket)
161
162
    # ------------------------------------------------------------------------------------------------------------------
163
    def send_message(self, end_point, message):
164
        """
165
        Sends a message to an end point.
166
167
        :param str end_point: The name of the end point.
168
        :param enarksh.message.Message.Message message: The message.
169
        """
170
        socket = self._end_points[end_point]
171
        socket.send_pyobj(message)
172
173
    # ------------------------------------------------------------------------------------------------------------------
174
    def no_barking(self, seconds):
175
        """
176
        During start up of ZMQ the incoming file descriptors become 'ready for reading' while there is no message on
177
        the socket. This method prevent incoming sockets barking that the are ready the for reading.
178
179
        :param int seconds: The number of seconds the give the other ZMQ thread to start up.
180
        """
181
        sleep(seconds)
182
183
        for _ in range(1, len(self.end_points)):
184
            poller = zmq.Poller()
185
            for socket in self.end_points.values():
186
                if socket.type in [zmq.PULL, zmq.REP]:
187
                    poller.register(socket, zmq.POLLIN)
188
189
            poller.poll(1)
190
191
# ----------------------------------------------------------------------------------------------------------------------
192