GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.

Controller.__shutdown()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
cc 1
c 0
b 0
f 0
dl 0
loc 5
ccs 0
cts 2
cp 0
crap 2
rs 9.4285
1
"""
2
Enarksh
3
4
Copyright 2013-2016 Set Based IT Consultancy
5
6
Licence MIT
7
"""
8
import gc
9
import logging
10
import os
11
import pwd
12
13
import zmq
14
15
from enarksh.Config import Config
16
from enarksh.Credentials import Credentials
17
from enarksh.DataLayer import DataLayer
18
from enarksh.controller import resource
19
from enarksh.controller.Schedule import Schedule
20
from enarksh.controller.event_handler.DynamicWorkerDefinitionMessageEventHandler import \
21
    DynamicWorkerDefinitionMessageEventHandler
22
from enarksh.controller.event_handler.EventQueueEmptyEventHandler import EventQueueEmptyEventHandler
23
from enarksh.controller.event_handler.HaltMessageEventHandler import HaltMessageEventHandler
24
from enarksh.controller.event_handler.JobFinishedMessageEventHandler import JobFinishedMessageEventHandler
25
from enarksh.controller.event_handler.MailOperatorEventHandler import MailOperatorEventHandler
26
from enarksh.controller.event_handler.NagiosMessageEventHandler import NagiosMessageEventHandler
27
from enarksh.controller.event_handler.NodeActionMessageEventHandler import NodeActionMessageEventHandler
28
from enarksh.controller.event_handler.NodeActionMessageWebEventHandler import NodeActionMessageWebEventHandler
29
from enarksh.controller.event_handler.PossibleNodeActionsWebMessageEventHandler import \
30
    PossibleNodeActionsWebMessageEventHandler
31
from enarksh.controller.event_handler.ScheduleDefinitionMessageEventHandler import ScheduleDefinitionMessageEventHandler
32
from enarksh.controller.message.DynamicWorkerDefinitionMessage import DynamicWorkerDefinitionMessage
33
from enarksh.controller.message.JobFinishedMessage import JobFinishedMessage
34
from enarksh.controller.message.NagiosMessage import NagiosMessage
35
from enarksh.controller.message.NodeActionMessage import NodeActionMessage
36
from enarksh.controller.message.NodeActionWebMessage import NodeActionWebMessage
37
from enarksh.controller.message.PossibleNodeActionsWebMessage import PossibleNodeActionsWebMessage
38
from enarksh.controller.message.ScheduleDefinitionMessage import ScheduleDefinitionMessage
39
from enarksh.controller.node import Node
40
from enarksh.event.Event import Event
41
from enarksh.event.EventActor import EventActor
42
from enarksh.event.EventController import EventController
43
from enarksh.message.HaltMessage import HaltMessage
44
from enarksh.message.MessageController import MessageController
45
46
47
class Controller(EventActor):
48
    """
49
    The controller of Enarksh.
50
    """
51
    # ------------------------------------------------------------------------------------------------------------------
52
    def __init__(self):
53
        """
54
        Object constructor.
55
        """
56
        self.event_controller = EventController()
57
        """
58
        The event controller.
59
60
        :type: enarksh.event.EventController.EventController
61
        """
62
63
        EventActor.__init__(self)
64
65
        self.message_controller = MessageController()
66
        """
67
        The message controller.
68
69
        :type: enarksh.message.MessageController.MessageController
70
        """
71
72
        self.host_resources = {}
73
        """
74
        All resources defined at host level.
75
76
        :type: dict
77
        """
78
79
        self.schedules = {}
80
        """
81
        All the current schedules.
82
83
        :type: dict[int,enarksh.controller.Schedule.Schedule]
84
        """
85
86
        self.__log = logging.getLogger('enarksh')
87
        """
88
        The logger.
89
90
        :type: logging.Logger
91
        """
92
93
        # Create event for a new node has been created.
94
        Node.event_new_node_creation = Event(self)
95
96
    # ------------------------------------------------------------------------------------------------------------------
97
    @staticmethod
98
    def __set_unprivileged_user():
99
        """
100
        Set the real and effective user and group to an unprivileged user.
101
        """
102
        _, _, uid, gid, _, _, _ = pwd.getpwnam('enarksh')
103
104
        os.setresgid(gid, gid, 0)
105
        os.setresuid(uid, uid, 0)
106
107
    # ------------------------------------------------------------------------------------------------------------------
108
    def __create_host_resources(self):
109
        """
110
        Creates resources defined at host level.
111
        """
112
        resources_data = DataLayer.enk_back_get_host_resources()
113
        for resource_data in resources_data:
114
            self.host_resources[resource_data['rsc_id']] = resource.create_resource(resource_data)
115
116
    # ------------------------------------------------------------------------------------------------------------------
117
    def __startup(self):
118
        """
119
        Performs the necessary actions for starting the controller daemon.
120
        """
121
        self.__log.info('Starting controller')
122
123
        credentials = Credentials.get()
124
125
        # Set database configuration options.
126
        DataLayer.config['host'] = credentials.get_host()
127
        DataLayer.config['user'] = credentials.get_user()
128
        DataLayer.config['password'] = credentials.get_password()
129
        DataLayer.config['database'] = credentials.get_database()
130
        DataLayer.config['port'] = credentials.get_port()
131
        DataLayer.config['autocommit'] = False
132
133
        # Connect to the MySQL.
134
        DataLayer.connect()
135
        DataLayer.start_transaction()
136
137
        # Sanitise the data in the database.
138
        DataLayer.enk_back_controller_init()
139
140
        # Create resources defined at host level.
141
        self.__create_host_resources()
142
143
        # Set the effective user and group to an unprivileged user and group.
144
        self.__set_unprivileged_user()
145
146
        # Commit transaction and close connection to MySQL.
147
        DataLayer.commit()
148
149
    # ------------------------------------------------------------------------------------------------------------------
150
    def __shutdown(self):
151
        """
152
        Performs the necessary actions for stopping the controller.
153
        """
154
        self.__log.info('Stopping controller')
155
156
    # ------------------------------------------------------------------------------------------------------------------
157
    def slot_schedule_termination(self, event, rst_id, _listener_data):
158
        """
159
160
        :param enarksh.event.Event.Event event: The event.
161
        :param int rst_id: The run status of the schedule.
162
        :param * _listener_data: Not used
163
        """
164
        del _listener_data
165
166
        schedule = event.source
167
168
        self.__log.info('Schedule {} has terminated with status {}'.format(schedule.sch_id, rst_id))
169
170
        self.unload_schedule(schedule.sch_id)
171
172
    # ------------------------------------------------------------------------------------------------------------------
173
    def load_schedule(self, sch_id):
174
        """
175
        :param int sch_id:
176
177
        :rtype: enarksh.controller.Schedule.Schedule
178
        """
179
        self.__log.info('Loading schedule {}'.format(sch_id))
180
181
        # Load the schedule.
182
        schedule = Schedule(sch_id, self.host_resources)
183
        schedule.event_schedule_termination.register_listener(self.slot_schedule_termination)
184
185
        # Register the schedule.
186
        self.schedules[sch_id] = schedule
187
188
        return schedule
189
190
    # ------------------------------------------------------------------------------------------------------------------
191
    def unload_schedule(self, sch_id):
192
        """
193
        :param int sch_id:
194
        """
195
        self.__log.info('Unloading schedule {}'.format(sch_id))
196
197
        if sch_id in self.schedules:
198
            # Remove the schedule.
199
            del self.schedules[sch_id]
200
201
            gc.collect()
202
203
    # ------------------------------------------------------------------------------------------------------------------
204
    def reload_schedule(self, sch_id):
205
        """
206
        :param int sch_id:
207
208
        :rtype: enarksh.controller.Schedule.Schedule
209
        """
210
        self.unload_schedule(sch_id)
211
212
        return self.load_schedule(sch_id)
213
214
    # ------------------------------------------------------------------------------------------------------------------
215
    def get_schedule_by_sch_id(self, sch_id):
216
        """
217
        Returns a schedule.
218
219
        :param int sch_id: The ID of the schedule.
220
221
        :rtype: enarksh.controller.Schedule.Schedule
222
        """
223
        schedule = self.schedules.get(int(sch_id), None)
224
        if not schedule:
225
            # Load the schedule if the schedule is not currently loaded.
226
            schedule = self.load_schedule(sch_id)
227
228
        return schedule
229
230
    # ------------------------------------------------------------------------------------------------------------------
231
    def __register_sockets(self):
232
        """
233
        Registers ZMQ sockets for communication with other processes in Enarksh.
234
        """
235
        config = Config.get()
236
237
        # Register socket for receiving asynchronous incoming messages.
238
        self.message_controller.register_end_point('pull', zmq.PULL, config.get_controller_pull_end_point())
239
240
        # Create socket for lockstep incoming messages.
241
        self.message_controller.register_end_point('lockstep', zmq.REP, config.get_controller_lockstep_end_point())
242
243
        # Create socket for sending asynchronous messages to the spanner.
244
        self.message_controller.register_end_point('spawner', zmq.PUSH, config.get_spawner_pull_end_point())
245
246
        # Create socket for sending asynchronous messages to the logger.
247
        self.message_controller.register_end_point('logger', zmq.PUSH, config.get_logger_pull_end_point())
248
249
    # ------------------------------------------------------------------------------------------------------------------
250
    def __register_message_types(self):
251
        """
252
        Registers all message type that the controller handles at the message controller.
253
        """
254
        self.message_controller.register_message_type(DynamicWorkerDefinitionMessage.MESSAGE_TYPE)
255
        self.message_controller.register_message_type(JobFinishedMessage.MESSAGE_TYPE)
256
        self.message_controller.register_message_type(HaltMessage.MESSAGE_TYPE)
257
        self.message_controller.register_message_type(NagiosMessage.MESSAGE_TYPE)
258
        self.message_controller.register_message_type(NodeActionMessage.MESSAGE_TYPE)
259
        self.message_controller.register_message_type(ScheduleDefinitionMessage.MESSAGE_TYPE)
260
        self.message_controller.register_message_type(NodeActionWebMessage.MESSAGE_TYPE)
261
        self.message_controller.register_message_type(PossibleNodeActionsWebMessage.MESSAGE_TYPE)
262
263
        # Register JSON messages.
264
        self.message_controller.register_json_message_creator(PossibleNodeActionsWebMessage.MESSAGE_TYPE,
265
                                                              PossibleNodeActionsWebMessage.create_from_json)
266
        self.message_controller.register_json_message_creator(NodeActionWebMessage.MESSAGE_TYPE,
267
                                                              NodeActionWebMessage.create_from_json)
268
269
    # ------------------------------------------------------------------------------------------------------------------
270
    def __register_events_handlers(self):
271
        """
272
        Registers all event handlers at the event controller.
273
        """
274
        # Register message received event handlers.
275
        self.message_controller.register_listener(DynamicWorkerDefinitionMessage.MESSAGE_TYPE,
276
                                                  DynamicWorkerDefinitionMessageEventHandler.handle,
277
                                                  self)
278
        self.message_controller.register_listener(JobFinishedMessage.MESSAGE_TYPE,
279
                                                  JobFinishedMessageEventHandler.handle,
280
                                                  self)
281
        self.message_controller.register_listener(HaltMessage.MESSAGE_TYPE,
282
                                                  HaltMessageEventHandler.handle,
283
                                                  self)
284
        self.message_controller.register_listener(NagiosMessage.MESSAGE_TYPE,
285
                                                  NagiosMessageEventHandler.handle,
286
                                                  self)
287
        self.message_controller.register_listener(NodeActionMessage.MESSAGE_TYPE,
288
                                                  NodeActionMessageEventHandler.handle,
289
                                                  self)
290
        self.message_controller.register_listener(ScheduleDefinitionMessage.MESSAGE_TYPE,
291
                                                  ScheduleDefinitionMessageEventHandler.handle,
292
                                                  self)
293
        self.message_controller.register_listener(NodeActionWebMessage.MESSAGE_TYPE,
294
                                                  NodeActionMessageWebEventHandler.handle,
295
                                                  self)
296
        self.message_controller.register_listener(PossibleNodeActionsWebMessage.MESSAGE_TYPE,
297
                                                  PossibleNodeActionsWebMessageEventHandler.handle,
298
                                                  self)
299
300
        # Register other event handlers.
301
        self.event_controller.event_queue_empty.register_listener(EventQueueEmptyEventHandler.handle, self)
302
303
        Node.event_new_node_creation.register_listener(MailOperatorEventHandler.handle_node_creation)
304
305
    # ------------------------------------------------------------------------------------------------------------------
306
    def main(self):
307
        """
308
        The main function of the job spawner.
309
        """
310
        self.__startup()
311
312
        self.__register_sockets()
313
314
        self.__register_message_types()
315
316
        self.__register_events_handlers()
317
318
        self.message_controller.no_barking(5)
319
320
        self.event_controller.loop()
321
322
        self.__shutdown()
323
324
# ----------------------------------------------------------------------------------------------------------------------
325