GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Passed
Push — master ( 0342e8...c99b2a )
by P.R.
02:59
created

Controller   B

Complexity

Total Complexity 51

Size/Duplication

Total Lines 526
Duplicated Lines 0 %

Test Coverage

Coverage 0%

Importance

Changes 2
Bugs 0 Features 0
Metric Value
c 2
b 0
f 0
dl 0
loc 526
ccs 0
cts 214
cp 0
rs 8.3206
wmc 51

2 Methods

Rating   Name   Duplication   Size   Complexity  
B __init__() 0 41 1
A daemonize() 0 6 1

How to fix   Complexity   

Complex Class

Complex classes like Controller often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
Enarksh
3
4
Copyright 2013-2016 Set Based IT Consultancy
5
6
Licence MIT
7
"""
8
import os
9
import pwd
10
import sys
11
import traceback
12
import functools
13
import gc
14
15
import zmq
16
17
import enarksh
18
from enarksh.Controller import Resource
19
from enarksh.DataLayer import DataLayer
20
from enarksh.Controller.Schedule import Schedule
21
from enarksh.XmlReader.Node.FakeParent import FakeParent
22
from enarksh.XmlReader.XmlReader import XmlReader
23
24
25
class Controller:
0 ignored issues
show
Coding Style introduced by
This class should have a docstring.

The coding style of this project requires that you add a docstring to this code element. Below, you find an example for methods:

class SomeClass:
    def some_method(self):
        """Do x and return foo."""

If you would like to know more about docstrings, we recommend to read PEP-257: Docstring Conventions.

Loading history...
26
    _instance = None
27
28
    # ------------------------------------------------------------------------------------------------------------------
29
    def __init__(self):
30
        Controller._instance = self
31
32
        self._zmq_context = None
33
        """
34
        :type: zmq.Context
35
        """
36
37
        self._zmq_pull_socket = zmq.sugar.socket.Socket
38
        """
39
        :type: zmq.sugar.socket.Socket
40
        """
41
42
        self._zmq_lockstep_socket = None
43
        """
44
        :type: zmq.sugar.socket.Socket
45
        """
46
47
        self._zmq_spanner = None
48
        """
49
        :type: Socket
50
        """
51
52
        self._zmq_logger = None
53
        """
54
        :type: Socket
55
        """
56
57
        self._host_resources = {}
58
        """
59
        All resources defined at host level.
60
61
        :type: dict
62
        """
63
64
        self._schedules = {}
65
        """
66
        All the current schedules.
67
68
        :type: dict
69
        """
70
71
    # ------------------------------------------------------------------------------------------------------------------
72
    def _zmq_init(self):
0 ignored issues
show
Coding Style introduced by
This method should have a docstring.

The coding style of this project requires that you add a docstring to this code element. Below, you find an example for methods:

class SomeClass:
    def some_method(self):
        """Do x and return foo."""

If you would like to know more about docstrings, we recommend to read PEP-257: Docstring Conventions.

Loading history...
73
        self._zmq_context = zmq.Context()
74
75
        # Create socket for asynchronous incoming messages.
76
        self._zmq_pull_socket = self._zmq_context.socket(zmq.PULL)
0 ignored issues
show
Bug introduced by
The Module zmq does not seem to have a member named PULL.

This check looks for calls to members that are non-existent. These calls will fail.

The member could have been renamed or removed.

Loading history...
77
        self._zmq_pull_socket.bind(enarksh.CONTROLLER_PULL_END_POINT)
0 ignored issues
show
Bug introduced by
The Class Socket does not seem to have a member named bind.

This check looks for calls to members that are non-existent. These calls will fail.

The member could have been renamed or removed.

Loading history...
78
79
        # Create socket for lockstep incoming messages.
80
        self._zmq_lockstep_socket = self._zmq_context.socket(zmq.REP)
0 ignored issues
show
Bug introduced by
The Module zmq does not seem to have a member named REP.

This check looks for calls to members that are non-existent. These calls will fail.

The member could have been renamed or removed.

Loading history...
81
        self._zmq_lockstep_socket.bind(enarksh.CONTROLLER_LOCKSTEP_END_POINT)
82
83
        # Create socket for sending asynchronous messages to the spanner.
84
        self._zmq_spanner = self._zmq_context.socket(zmq.PUSH)
1 ignored issue
show
Bug introduced by
The Module zmq does not seem to have a member named PUSH.

This check looks for calls to members that are non-existent. These calls will fail.

The member could have been renamed or removed.

Loading history...
85
        self._zmq_spanner.connect(enarksh.SPAWNER_PULL_END_POINT)
86
87
        # Create socket for sending asynchronous messages to the logger.
88
        self._zmq_logger = self._zmq_context.socket(zmq.PUSH)
1 ignored issue
show
Bug introduced by
The Module zmq does not seem to have a member named PUSH.

This check looks for calls to members that are non-existent. These calls will fail.

The member could have been renamed or removed.

Loading history...
89
        self._zmq_logger.connect(enarksh.LOGGER_PULL_END_POINT)
90
91
    # ------------------------------------------------------------------------------------------------------------------
92
    @staticmethod
93
    def daemonize():
0 ignored issues
show
Coding Style introduced by
This method should have a docstring.

The coding style of this project requires that you add a docstring to this code element. Below, you find an example for methods:

class SomeClass:
    def some_method(self):
        """Do x and return foo."""

If you would like to know more about docstrings, we recommend to read PEP-257: Docstring Conventions.

Loading history...
94
        enarksh.daemonize(enarksh.HOME + '/var/lock/controllerd.pid',
95
                          '/dev/null',
96
                          enarksh.HOME + '/var/log/controllerd.log',
97
                          enarksh.HOME + '/var/log/controllerd.log')
98
99
    # ------------------------------------------------------------------------------------------------------------------
100
    @staticmethod
101
    def _set_unprivileged_user():
102
        """
103
        Set the real and effective user and group to an unprivileged user.
104
        """
105
        _, _, uid, gid, _, _, _ = pwd.getpwnam('enarksh')
106
107
        os.setresgid(gid, gid, 0)
108
        os.setresuid(uid, uid, 0)
109
110
    # ------------------------------------------------------------------------------------------------------------------
111
    def _create_host_resources(self):
112
        """
113
        Creates resources defined at host level.
114
        """
115
        resources_data = DataLayer.enk_back_get_host_resources()
116
        for resource_data in resources_data:
117
            self._host_resources[resource_data['rsc_id']] = Resource.create_resource(resource_data)
118
119
    # ------------------------------------------------------------------------------------------------------------------
120
    def _startup(self):
121
        """
122
        Performs the necessary actions for starting the controller daemon.
123
        """
124
        print('Start controller.')
125
126
        # Set database configuration options.
127
        DataLayer.config['host'] = enarksh.MYSQL_HOSTNAME
128
        DataLayer.config['user'] = enarksh.MYSQL_USERNAME
129
        DataLayer.config['password'] = enarksh.MYSQL_PASSWORD
130
        DataLayer.config['database'] = enarksh.MYSQL_SCHEMA
131
        DataLayer.config['port'] = enarksh.MYSQL_PORT
132
        DataLayer.config['autocommit'] = False
133
134
        # Connect to the MySQL.
135
        DataLayer.connect()
136
137
        # Sanitise the data in the database.
138
        DataLayer.enk_back_controller_init()
139
140
        # Create resources defined at host level.
141
        self._create_host_resources()
142
143
        # Set the effective user and group to an unprivileged user and group.
144
        # self._set_unprivileged_user()
145
146
        # Become a daemon.
147
        # self.__daemonize()
148
149
        self._zmq_init()
150
151
        # Housekeeping to track statuses of nodes.
152
        Schedule.register_observer_schedule_termination(self.slot_schedule_termination)
153
154
        # Commit transaction and close connection to MySQL.
155
        DataLayer.commit()
156
        DataLayer.disconnect()
157
158
    # ------------------------------------------------------------------------------------------------------------------
159
    def slot_schedule_termination(self, schedule, rst_id):
160
        """
161
        :param enarksh.Controller.Schedule.Schedule schedule:
162
        :param int rst_id:
163
        """
164
        print("Schedule %s has terminated with status %s" % (schedule.get_sch_id(), rst_id))
165
166
        self._unload_schedule(schedule.get_sch_id())
167
168
    # ------------------------------------------------------------------------------------------------------------------
169
    def _load_schedule(self, sch_id):
170
        """
171
        :param int sch_id:
172
173
        :rtype: enarksh.Controller.Schedule.Schedule
174
        """
175
        print("Loading schedule '%s'." % sch_id)
176
177
        # Load the schedule.
178
        schedule = Schedule(sch_id, self._host_resources)
179
180
        # Register the schedule.
181
        self._schedules[sch_id] = schedule
182
183
        return schedule
184
185
    # ------------------------------------------------------------------------------------------------------------------
186
    def _unload_schedule(self, sch_id):
187
        """
188
        :param int sch_id:
189
        """
190
        print("Unloading schedule '%s'." % sch_id)
191
192
        if sch_id in self._schedules:
193
            schedule = self._schedules[sch_id]
194
195
            # Remove the schedule.
196
            del self._schedules[sch_id]
197
198
            schedule.destroy()
199
200
            gc.collect()
201
202
    # ------------------------------------------------------------------------------------------------------------------
203
    def _reload_schedule(self, sch_id):
204
        """
205
        :param int sch_id:
206
207
        :rtype: enarksh.Controller.Schedule.Schedule
208
        """
209
        self._unload_schedule(sch_id)
210
211
        return self._load_schedule(sch_id)
212
213
    # ------------------------------------------------------------------------------------------------------------------
214
    @staticmethod
215
    def queue_compare(schedule1, schedule2):
216
        """
217
        Compares two schedules for sorting queued nodes.
218
219
        :param enarksh.Controller.Schedule.Schedule schedule1:
220
        :param enarksh.Controller.Schedule.Schedule schedule2:
221
222
        :rtype:
223
        """
224
        return -(schedule1.get_schedule_load() - schedule2.get_schedule_load)
225
226
    # ------------------------------------------------------------------------------------------------------------------
227
    def _queue_handler(self):
228
        """
229
        Starts node that are queued and for which there are enough resources available.
230
        """
231
        # Return immediately if there are no loaded schedules.
232
        if not self._schedules:
233
            return
234
235
        DataLayer.connect()
236
        DataLayer.start_transaction()
237
238
        while True:
239
            start = False
240
            if self._schedules:
241
                schedules = sorted(self._schedules, key=functools.cmp_to_key(Controller.queue_compare))
242
                schedule = self._schedules[schedules[0]]
243
                queue = schedule.get_queue()
244
                for node in queue:
245
                    # Inquire if there are enough resources available for the node.
246
                    start = node.inquire_resources()
247
                    if start:
248
                        span_job = node.start()
249
250
                        # If required send a message to the spanner.
251
                        if span_job:
252
                            message = node.get_start_message()
253
                            message['sch_id'] = schedule.get_sch_id()
254
                            self._zmq_spanner.send_json(message)
255
256
                        else:
257
                            node.stop(0)
258
259
                        # If a node has been started leave inner loop.
260
                        break
261
262
            # If no node has been started leave the outer loop.
263
            if not start:
264
                break
265
266
        # Commit the last transaction (if any) and close the connection to the database.
267
        DataLayer.commit()
268
        DataLayer.disconnect()
269
270
    # ------------------------------------------------------------------------------------------------------------------
271
    def _get_schedule_by_sch_id(self, sch_id):
272
        """
273
        Returns a schedule.
274
275
        :param int sch_id: The ID of the schedule.
276
277
        :rtype: enarksh.Controller.Schedule.Schedule
278
        """
279
        schedule = self._schedules.get(int(sch_id), None)
280
        if not schedule:
281
            # Load the schedule if the schedule is not currently loaded.
282
            schedule = self._load_schedule(sch_id)
283
284
        return schedule
285
286
    # ------------------------------------------------------------------------------------------------------------------
287
    def _get_possible_node_actions(self, sch_id, rnd_id):
288
        """
289
        Returns the possible actions for a node.
290
291
        :param int sch_id: The ID of the schedule of the node.
292
        :param int rnd_id: The ID of the node.
293
294
        :rtype dict[str, bool|dict[int, dict[str, mixed]]]: Dictionary with possible node actions.
295
        """
296
        message = {'actions': {enarksh.ENK_ACT_ID_TRIGGER: {'act_id': enarksh.ENK_ACT_ID_TRIGGER,
297
                                                            'act_title': 'Trigger',
298
                                                            'act_enabled': False},
299
                               enarksh.ENK_ACT_ID_RESTART: {'act_id': enarksh.ENK_ACT_ID_RESTART,
300
                                                            'act_title': 'Restart',
301
                                                            'act_enabled': False},
302
                               enarksh.ENK_ACT_ID_RESTART_FAILED: {'act_id': enarksh.ENK_ACT_ID_RESTART_FAILED,
303
                                                                   'act_title': 'Restart Failed',
304
                                                                   'act_enabled': False}},
305
                   'mail_on_completion': False,
306
                   'mail_on_error': False}
307
308
        # Find the schedule of the node.
309
        schedule = self._get_schedule_by_sch_id(sch_id)
310
        if not schedule:
311
            # Node rnd_id is not part of a current run of a current schedule revision.
312
            return message
313
314
        return schedule.request_possible_node_actions(rnd_id, message)
315
316
    # ------------------------------------------------------------------------------------------------------------------
317
    def _message_handler_request_possible_node_actions(self, message):
318
        """
319
        Handles a request (from the web interface) for possible actions of a certain node.
320
321
        :param dict message: The message of the request.
322
        """
323
        sch_id = int(message['sch_id'])
324
        rnd_id = int(message['rnd_id'])
325
326
        response = self._get_possible_node_actions(sch_id, rnd_id)
327
328
        # Send the message to the web interface.
329
        self._zmq_lockstep_socket.send_json(response)
330
331
    # ------------------------------------------------------------------------------------------------------------------
332
    def _message_handler_schedule_definition(self, message):
333
        """
334
        Handles a request (from the operator) for loading a new schedule.
335
336
        :param dict message: The message of the request.
337
        """
338
        try:
339
            # Validate XML against XSD.
340
            reader = XmlReader()
341
            schedule = reader.parse_schedule(message['xml'], message['filename'])
342
343
            # Test schedule is currently running.
344
            name = schedule.name
345
            if name in self._schedules:
346
                raise Exception("Schedule '%s' is currently running." % name)
347
348
            # Insert the XML definition as BLOB in tot the database.
349
            blb_id = DataLayer.enk_blob_insert_blob(os.path.basename(message['filename']), 'text/xml', message['xml'])
350
            srv_id = DataLayer.enk_reader_schedule_create_revision(blb_id, name)
351
            if not srv_id:
352
                raise Exception("Schedule '%s' is already loaded." % name)
353
354
            # Store the new schedule definition into the database.
355
            schedule.store(srv_id, 1)
356
            DataLayer.enk_back_schedule_revision_create_run(srv_id)
357
358
            response = {'ret': 0,
359
                        'message': "Schedule '%s' successfully loaded." % name}
360
        except Exception as exception:
0 ignored issues
show
Best Practice introduced by
Catching very general exceptions such as Exception is usually not recommended.

Generally, you would want to handle very specific errors in the exception handler. This ensure that you do not hide other types of errors which should be fixed.

So, unless you specifically plan to handle any error, consider adding a more specific exception.

Loading history...
361
            print(exception, file=sys.stderr)
362
            response = {'ret': -1,
363
                        'message': str(exception)}
364
            traceback.print_exc(file=sys.stderr)
365
366
        self._zmq_lockstep_socket.send_json(response)
367
368
    # ------------------------------------------------------------------------------------------------------------------
369
    def _message_handler_dynamic_worker_definition(self, message):
370
        """
371
        Handles a request for loading a dynamic worker definition.
372
373
        :param dict message: The message of the request.
374
        """
375
        try:
376
            sch_id = int(message['sch_id'])
377
            rnd_id = int(message['rnd_id'])
378
379
            print('rnd_id: ' + str(rnd_id))
380
381
            # Get info of the dynamic node.
382
            info = DataLayer.enk_back_run_node_get_dynamic_info_by_generator(rnd_id)
383
384
            schedule = self._get_schedule_by_sch_id(sch_id)
385
            parent = FakeParent(schedule,
386
                                self._host_resources,
387
                                info['nod_id_outer_worker'],
388
                                info['rnd_id_outer_worker'])
389
390
            # Validate XML against XSD.
391
            reader = XmlReader()
392
            inner_worker = reader.parse_dynamic_worker(message['definition'], parent)
393
            name = inner_worker.name
394
395
            # Note: Dynamic node is the parent of the worker node which is the parent of the inner worker node.
396
            inner_worker.set_levels(info['nod_recursion_level']+2)
397
398
            # Store the dynamically defined inner worker node.
399
            inner_worker.store(info['srv_id'], 0)
400
401
            # Create dependencies between the input and output port of the worker node and its child node(s).
402
            DataLayer.enk_back_node_dynamic_add_dependencies(info['nod_id_outer_worker'], inner_worker.nod_id)
403
404
            # XXX trigger reload of front end
405
406
            # Unload the schedule to force a reload of the schedule with new nodes added.
407
            self._unload_schedule(sch_id)
408
409
            response = {'ret': 0,
410
                        'message': "Worker '%s' successfully loaded." % name}
411
412
        except Exception as exception:
0 ignored issues
show
Best Practice introduced by
Catching very general exceptions such as Exception is usually not recommended.

Generally, you would want to handle very specific errors in the exception handler. This ensure that you do not hide other types of errors which should be fixed.

So, unless you specifically plan to handle any error, consider adding a more specific exception.

Loading history...
413
            print(exception, file=sys.stderr)
414
            response = {'ret': -1,
415
                        'message': str(exception)}
416
            traceback.print_exc(file=sys.stderr)
417
418
        self._zmq_lockstep_socket.send_json(response)
419
420
    # ------------------------------------------------------------------------------------------------------------------
421
    def _message_handler_request_node_action(self, message):
422
        """
423
        Executes a node action request.
424
425
        :param dict message: dict The message of the request.
426
        """
427
        # Compose a response message for the web interface.
428
        response = {'ret': 0,
429
                    'new_run': 0,
430
                    'message': 'OK'}
431
432
        try:
433
            sch_id = int(message['sch_id'])
434
            rnd_id = int(message['rnd_id'])
435
            act_id = int(message['act_id'])
436
437
            actions = self._get_possible_node_actions(sch_id, rnd_id)
438
            if act_id not in actions['actions'] or not actions['actions'][act_id]['act_enabled']:
439
                response['ret'] = -1
440
                response['message'] = 'Not a valid action'
441
            else:
442
                schedule = self._get_schedule_by_sch_id(sch_id)
443
                reload = schedule.request_node_action(rnd_id,
444
                                                      act_id,
445
                                                      message['usr_login'],
446
                                                      (message['mail_on_completion']),
447
                                                      (message['mail_on_error']))
448
                if reload:
449
                    # Schedule must be reloaded.
450
                    schedule = self._reload_schedule(schedule.sch_id)
451
                    # A reload is only required when the schedule is been triggered. However, this trigger is lost by
452
                    # reloading the schedule. So, resend the trigger.
453
                    schedule.request_node_action(schedule.get_activate_node().get_rnd_id(),
454
                        act_id,
455
                        message['usr_login'],
456
                        (message['mail_on_completion']),
457
                        (message['mail_on_error']))
458
459
                    if act_id == enarksh.ENK_ACT_ID_TRIGGER:
460
                        response['new_run'] = 1
461
462
        except Exception as exception:
0 ignored issues
show
Best Practice introduced by
Catching very general exceptions such as Exception is usually not recommended.

Generally, you would want to handle very specific errors in the exception handler. This ensure that you do not hide other types of errors which should be fixed.

So, unless you specifically plan to handle any error, consider adding a more specific exception.

Loading history...
463
            print(exception, file=sys.stderr)
464
            traceback.print_exc(file=sys.stderr)
465
466
            response['ret'] = -1
467
            response['message'] = 'Internal error'
468
469
        # Send the message to the web interface.
470
        self._zmq_lockstep_socket.send_json(response)
471
472
    # ------------------------------------------------------------------------------------------------------------------
473
    def _message_handler_node_stop(self, message):
474
        """
475
        Handles a message sent by the spanner after a job has finished.
476
477
        :param dict message:
478
        """
479
        schedule = self._get_schedule_by_sch_id(message['sch_id'])
480
        schedule.event_node_stop(message['rnd_id'], message['exit_status'])
481
482
    # ------------------------------------------------------------------------------------------------------------------
483
    def _message_handler(self):
484
        """
485
        Waits for a new messages and processes these messages.
486
        """
487
        poller = zmq.Poller()
0 ignored issues
show
Bug introduced by
The Module zmq does not seem to have a member named Poller.

This check looks for calls to members that are non-existent. These calls will fail.

The member could have been renamed or removed.

Loading history...
488
        poller.register(self._zmq_pull_socket, zmq.POLLIN)
0 ignored issues
show
Bug introduced by
The Module zmq does not seem to have a member named POLLIN.

This check looks for calls to members that are non-existent. These calls will fail.

The member could have been renamed or removed.

Loading history...
489
        poller.register(self._zmq_lockstep_socket, zmq.POLLIN)
0 ignored issues
show
Bug introduced by
The Module zmq does not seem to have a member named POLLIN.

This check looks for calls to members that are non-existent. These calls will fail.

The member could have been renamed or removed.

Loading history...
490
491
        socks = dict(poller.poll())
492
493
        DataLayer.connect()
494
        DataLayer.start_transaction()
495
496
        if self._zmq_pull_socket in socks:
497
            message = self._zmq_pull_socket.recv_json()
498
499
            if message['type'] == 'node_stop':
500
                self._message_handler_node_stop(message)
501
502
            else:
503
                raise Exception("Unknown message type '%s'." % message['type'])
504
505
        if self._zmq_lockstep_socket in socks:
506
            message = self._zmq_lockstep_socket.recv_json()
507
508
            if message['type'] == 'schedule_definition':
509
                self._message_handler_schedule_definition(message)
510
511
            elif message['type'] == 'request_node_action':
512
                self._message_handler_request_node_action(message)
513
514
            elif message['type'] == 'request_possible_node_actions':
515
                self._message_handler_request_possible_node_actions(message)
516
517
            elif message['type'] == 'dynamic_worker_definition':
518
                self._message_handler_dynamic_worker_definition(message)
519
520
            else:
521
                raise Exception("Unknown message type '%s'." % message['type'])
522
523
        DataLayer.commit()
524
        DataLayer.disconnect()
525
526
    # ------------------------------------------------------------------------------------------------------------------
527
    def main(self):
528
        """
529
        The main function of the job controller.
530
        """
531
        # Perform the necessary actions for starting the controller.
532
        self._startup()
533
534
        while True:
535
            try:
536
                # Wait for new messages and process these messages.
537
                self._message_handler()
538
539
                # Try to start nodes that are queued.
540
                self._queue_handler()
541
542
            except Exception as exception1:
0 ignored issues
show
Best Practice introduced by
Catching very general exceptions such as Exception is usually not recommended.

Generally, you would want to handle very specific errors in the exception handler. This ensure that you do not hide other types of errors which should be fixed.

So, unless you specifically plan to handle any error, consider adding a more specific exception.

Loading history...
543
                try:
544
                    DataLayer.rollback()
545
                    DataLayer.disconnect()
546
                except Exception as exception2:
0 ignored issues
show
Best Practice introduced by
Catching very general exceptions such as Exception is usually not recommended.

Generally, you would want to handle very specific errors in the exception handler. This ensure that you do not hide other types of errors which should be fixed.

So, unless you specifically plan to handle any error, consider adding a more specific exception.

Loading history...
547
                    print(exception2, file=sys.stderr)
548
                    traceback.print_exc(file=sys.stderr)
549
                print(exception1, file=sys.stderr)
550
                traceback.print_exc(file=sys.stderr)
551
552
553
# ----------------------------------------------------------------------------------------------------------------------
554