GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Passed
Pull Request — master (#4)
by Oleg
08:02
created

Schedule.sch_id()   A

Complexity

Conditions 1

Size

Total Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
cc 1
dl 0
loc 8
ccs 0
cts 2
cp 0
crap 2
rs 9.4285
c 0
b 0
f 0
1
"""
2
Enarksh
3
4
Copyright 2013-2016 Set Based IT Consultancy
5
6
Licence MIT
7
"""
8
from email.mime.text import MIMEText
9
import smtplib
10
import sys
11
import traceback
12
import functools
13
14
import enarksh
15
from enarksh.Controller import Resource
16
from enarksh.Controller import Consumption
17
from enarksh.DataLayer import DataLayer
18
from enarksh.Controller.Node import create_node, Node
0 ignored issues
show
Unused Code introduced by
Unused Node imported from enarksh.Controller.Node
Loading history...
19
20
21
class Schedule:
0 ignored issues
show
Coding Style introduced by
This class should have a docstring.

The coding style of this project requires that you add a docstring to this code element. Below, you find an example for methods:

class SomeClass:
    def some_method(self):
        """Do x and return foo."""

If you would like to know more about docstrings, we recommend to read PEP-257: Docstring Conventions.

Loading history...
22
    _observers_new_node = []
23
    """
24
    The objects that will notified when a schedule has create a new node.
25
26
    :type: list
27
    """
28
29
    _observers_schedule_termination = []
30
    """
31
    The objects that will notified when a schedule has terminated.
32
33
    :type: list
34
    """
35
36
    # ------------------------------------------------------------------------------------------------------------------
37
    def __init__(self, sch_id, host_resources):
38
        """
39
        Object constructor.
40
41
        :param int sch_id:
42
        :param dict host_resources:
43
        """
44
45
        self._sch_id = sch_id
46
        """
47
        The ID of the schedule.
48
49
        :type: int
50
        """
51
52
        self._nodes = {}
53
        """
54
        A map from rnd_id to node for all the current nodes in this schedule.
55
56
        :type: dict
57
        """
58
59
        self._children = {}
60
        """
61
        A map from rnd_id to a list with all child nodes.
62
63
        :type: dict
64
        """
65
66
        self._successors = {}
67
        """
68
        A map from rnd_id to a list with all (direct and indirect) successor nodes.
69
70
        :type: dict
71
        """
72
73
        self._schedule_load = 0
74
        """
75
        The load of this schedule. I.e. the current running (simple) nodes of this schedule.
76
77
        :type: dict
78
        """
79
80
        self._schedule_node = None
81
        """
82
        The node that is the actual schedule.
83
84
        :type: Node
85
        """
86
87
        self._activate_node = None
88
        """
89
        The node that is the activate node of the schedule.
90
91
        :type: Node
92
        """
93
94
        self._arrest_node = None
95
        """
96
        The node that is the arrest node of the schedule.
97
98
        :type: Node
99
        """
100
101
        self._mail_on_completion = True
102
        """
103
        If set a mail must be send to the operator when the schedule is finished running.
104
         :type: bool
105
        """
106
107
        self._mail_on_error = True
108
        """
109
        If set a mail must be send to the operator for each failed (simple) node.
110
111
         :type: bool
112
        """
113
114
        self._usr_login = ''
115
        """
116
        The user ID of the operator.
117
118
        :type: str
119
        """
120
121
        self._queue = set()
122
        """
123
        The queue of nodes that are ready to run.
124
125
        :type: set
126
        """
127
128
        self._load(sch_id, host_resources)
129
130
    # ------------------------------------------------------------------------------------------------------------------
131
    @property
132
    def sch_id(self):
133
        """
134
        Returns the ID of this schedule.
135
136
        :rtype: int
137
        """
138
        return self._sch_id
139
140
    # ------------------------------------------------------------------------------------------------------------------
141
    def __del__(self):
142
        print("Deleting schedule %s" % self._sch_id)
143
144
    # ------------------------------------------------------------------------------------------------------------------
145
    def _load(self, sch_id, host_resources):
146
        """
147
        Loads the schedule from the database.
148
149
        :param int sch_id:
150
        :param dict host_resources:
151
        """
152
        schedule = DataLayer.enk_back_schedule_get_schedule(sch_id)
153
154
        # Fetch all data of the run from the database.
155
        nodes_data = DataLayer.enk_back_run_get_run_nodes(schedule['run_id'])
156
        ports_data = DataLayer.enk_back_run_get_ports1(schedule['run_id'])
157
        node_ports_data = DataLayer.enk_back_run_get_ports2(schedule['run_id'])
158
        dependants_data = DataLayer.enk_back_run_get_dependants(schedule['srv_id'])
159
        consumptions_data = DataLayer.enk_back_run_get_consumptions(schedule['run_id'])
160
        resources_data = DataLayer.enk_back_run_get_resources(schedule['run_id'])
161
162
        # Create a lookup table for all child nodes of a node.
163
        tmp_child_nodes = {}
164
        for node_data in nodes_data.values():
165
            if node_data['rnd_id_parent']:
166
                if node_data['rnd_id_parent'] not in tmp_child_nodes:
167
                    tmp_child_nodes[node_data['rnd_id_parent']] = []
168
                tmp_child_nodes[node_data['rnd_id_parent']].append(node_data)
169
170
        # Create a lookup table for all direct successor nodes of a node.
171
        direct_successors = Schedule._create_successor_lookup_table1(nodes_data,
172
                                                                     tmp_child_nodes,
173
                                                                     node_ports_data,
174
                                                                     ports_data,
175
                                                                     dependants_data)
176
177
        # Create a lookup table for all direct predecessor nodes of a node.
178
        direct_predecessors = Schedule._create_predecessor_lookup_table1(direct_successors)
179
180
        # Create a lookup table for all (direct and indirect) successor nodes of a node.
181
        successors = {}
182
        for rnd_id_predecessor in direct_successors.keys():
183
            successors[rnd_id_predecessor] = {}
184
            for rnd_id_successor in direct_successors[rnd_id_predecessor]:
185
                successors[rnd_id_predecessor][rnd_id_successor] = True
186
                Schedule._create_successor_lookup_table3(successors[rnd_id_predecessor],
187
                                                         direct_successors,
188
                                                         rnd_id_successor)
189
190
        # Create all resources.
191
        resources = {}
192
        for node_resource_data in resources_data.values():
193
            for resource_data in node_resource_data:
194
                resource = Resource.create_resource(resource_data)
195
                resources[resource_data['rsc_id']] = resource
196
197
                # Observe resource for state changes.
198
                resource.register_observer(Schedule.slot_resource_state_change)
199
200
        # Create all consumptions.
201
        consumptions = {}
202
        for node_resource_data in consumptions_data.values():
203
            for consumption_data in node_resource_data:
204
                consumptions[consumption_data['cns_id']] = Consumption.create_consumption(consumption_data,
205
                                                                                          host_resources,
206
                                                                                          resources)
207
208
        # Create all nodes.
209
        for node_data in nodes_data.values():
210
            self._nodes[node_data['rnd_id']] = create_node(node_data)
211
212
        # Initialize all nodes.
213
        # First initialize all simple nodes. This has the effect that signals via StateChange.notify_observer are
214
        # send first to (simple) successor nodes and then to parent nodes (otherwise it is possible that the schedule
215
        # will be set to run status ENK_RST_ID_ERROR while a simple successor node did not yet receive a signal that
216
        # would put this node in run status ENK_RST_ID_QUEUED.
217
        for node_data in nodes_data.values():
218
            node = self._nodes[node_data['rnd_id']]
219
            if node.is_simple_node():
220
                node.initialize(node_data,
221
                                schedule,
222
                                resources,
223
                                resources_data,
224
                                consumptions,
225
                                consumptions_data,
226
                                self._nodes,
227
                                tmp_child_nodes,
228
                                direct_predecessors,
229
                                direct_successors,
230
                                successors)
231
232
                # Observe node for state changes.
233
                node.register_observer(self.slot_node_state_change)
234
235
                # Signal a new node has been created.
236
                self._notify_observers_new_node(self, node)
237
238
        # Second initialize complex nodes.
239
        for node_data in nodes_data.values():
240
            node = self._nodes[node_data['rnd_id']]
241
            if node.is_complex_node():
242
                node.initialize(node_data,
243
                                schedule,
244
                                resources,
245
                                resources_data,
246
                                consumptions,
247
                                consumptions_data,
248
                                self._nodes,
249
                                tmp_child_nodes,
250
                                direct_predecessors,
251
                                direct_successors,
252
                                successors)
253
254
                # Observe node for state changes.
255
                node.register_observer(self.slot_node_state_change)
256
257
                # Signal a new node has been created.
258
                self._notify_observers_new_node(self, node)
259
260
        # Create a map from rnd_id to all its child nodes.
261
        for (rnd_id_parent, nodes) in tmp_child_nodes.items():
262
            self._children[rnd_id_parent] = []
263
            for node_data in nodes:
264
                self._children[rnd_id_parent].append(self._nodes[node_data['rnd_id']])
265
266
        # Create a map from rnd_id to all its successor nodes.
267
        for (rnd_id, edges) in successors.items():
268
            self._successors[rnd_id] = []
269
            for rnd_id_successor in edges.keys():
270
                self._successors[rnd_id].append(self._nodes[rnd_id_successor])
271
272
        # Store the schedule, activate, and arrest node.
273
        self._schedule_node = self._nodes[schedule['rnd_id_schedule']]
274
        self._activate_node = self._nodes[schedule['rnd_id_activate']]
275
        self._arrest_node = self._nodes[schedule['rnd_id_arrest']]
276
277
    # ------------------------------------------------------------------------------------------------------------------
278
    def destroy(self):
0 ignored issues
show
Coding Style introduced by
This method should have a docstring.

The coding style of this project requires that you add a docstring to this code element. Below, you find an example for methods:

class SomeClass:
    def some_method(self):
        """Do x and return foo."""

If you would like to know more about docstrings, we recommend to read PEP-257: Docstring Conventions.

Loading history...
279
        for node in self._nodes.values():
280
            node.destroy()
281
282
    # ------------------------------------------------------------------------------------------------------------------
283
    @staticmethod
284
    def _create_successor_lookup_table1(nodes_data, child_nodes_data, node_ports_data, ports_data, dependants_data):
285
        """
286
        :param dict nodes_data:
287
        :param dict child_nodes_data:
288
        :param dict node_ports_data:
289
        :param dict ports_data:
290
        :param dict dependants_data:
291
292
        :rtype: dict
293
        """
294
        direct_lookup = {}
295
        for node_data in nodes_data.values():
296
            if node_data['rnd_id'] not in child_nodes_data:
297
                # Node is a simple node.
298
                direct_lookup[node_data['rnd_id']] = []
299
                if node_data['rnd_id'] in node_ports_data:
300
                    for port in node_ports_data[node_data['rnd_id']]:
301
                        if port['ptt_id'] == enarksh.ENK_PTT_ID_OUTPUT:
302
                            if port['prt_id'] in dependants_data:
303
                                for edge in dependants_data[port['prt_id']]:
304
                                    rnd_id = ports_data[edge['prt_id_dependant']]['rnd_id']
305
                                    if rnd_id in child_nodes_data:
306
                                        # Node rnd_id is a complex node.
307
                                        Schedule._create_successor_lookup_table2(
308
                                            direct_lookup[node_data['rnd_id']],
309
                                            nodes_data,
310
                                            child_nodes_data,
311
                                            ports_data,
312
                                            dependants_data,
313
                                            edge['prt_id_dependant'])
314
                                    else:
315
                                        # Node rnd_id is a simple node.
316
                                        direct_lookup[node_data['rnd_id']].append(rnd_id)
317
318
        return direct_lookup
319
320
    # ------------------------------------------------------------------------------------------------------------------
321
    @staticmethod
322
    def _create_successor_lookup_table2(lookup, nodes_data, child_nodes_data, ports_data, dependants_data, prt_id):
323
        """
324
        :param list lookup:
325
        :param dict nodes_data:
326
        :param dict child_nodes_data:
327
        :param dict ports_data:
328
        :param dict dependants_data:
329
        :param prt_id:
330
        """
331
        rnd_id1 = ports_data[prt_id]['rnd_id']
332
        if rnd_id1 in child_nodes_data:
333
            # Node rnd_id is a complex node.
334
            if prt_id in dependants_data:
335
                for edge in dependants_data[prt_id]:
336
                    Schedule._create_successor_lookup_table2(lookup,
337
                                                             nodes_data,
338
                                                             child_nodes_data,
339
                                                             ports_data,
340
                                                             dependants_data,
341
                                                             edge['prt_id_dependant'])
342
        else:
343
            # Node rnd_id is a simple node.
344
            lookup.append(rnd_id1)
345
346
    # ------------------------------------------------------------------------------------------------------------------
347
    @staticmethod
348
    def _create_successor_lookup_table3(lookup, direct_lookup, rnd_id_predecessor):
349
        """
350
        :param dict lookup:
351
        :param dict direct_lookup:
352
        :param int rnd_id_predecessor:
353
        """
354
        if rnd_id_predecessor in direct_lookup:
355
            for rnd_id_successor in direct_lookup[rnd_id_predecessor]:
356
                if rnd_id_successor not in lookup:
357
                    lookup[rnd_id_successor] = False
358
                    Schedule._create_successor_lookup_table3(lookup, direct_lookup, rnd_id_successor)
359
360
    # ------------------------------------------------------------------------------------------------------------------
361
    @staticmethod
362
    def _create_predecessor_lookup_table1(direct_successors):
363
        """
364
        :param dict direct_successors:
365
        :rtype: dict
366
        """
367
        predecessors = {}
368
369
        for rnd_id_predecessor in direct_successors.keys():
370
            for rnd_id_successor in direct_successors[rnd_id_predecessor]:
371
                if rnd_id_successor not in predecessors:
372
                    predecessors[rnd_id_successor] = []
373
                predecessors[rnd_id_successor].append(rnd_id_predecessor)
374
375
        return predecessors
376
377
    # ------------------------------------------------------------------------------------------------------------------
378
    def get_all_nodes(self):
379
        """
380
        Returns all nodes in this schedule.
381
382
        :rtype: dict
383
        """
384
        return self._nodes
385
386
    # ------------------------------------------------------------------------------------------------------------------
387
    def get_node(self, rnd_id):
388
        """
389
        Returns a node.
390
391
        :param int rnd_id: int The ID of the requested node.
392
393
        :rtype: enarksh.Controller.Node.Node
394
        """
395
        return self._nodes[rnd_id]
396
397
    # ------------------------------------------------------------------------------------------------------------------
398
    @staticmethod
399
    def register_observer_new_node(method):
400
        """
401
        Registers an object as an observer of the state of this object.
402
        """
403
        Schedule._observers_new_node.append(method)
404
405
    # ------------------------------------------------------------------------------------------------------------------
406
    @staticmethod
407
    def register_observer_schedule_termination(method):
408
        """
409
        Registers an object as an observer for termination of this schedule.
410
        """
411
        Schedule._observers_schedule_termination.append(method)
412
413
    # ------------------------------------------------------------------------------------------------------------------
414
    @staticmethod
415
    def _notify_observers_new_node(schedule, node):
416
        """
417
        Notifies all observers that a new node has been created.
418
419
        :param schedule:
420
        :param enarksh.Controller.Node.Node node:
421
        """
422
        for method in Schedule._observers_new_node:
423
            method(schedule, node)
424
425
    # ------------------------------------------------------------------------------------------------------------------
426
    @staticmethod
427
    def _notify_observers_schedule_termination(schedule, rst_id):
428
        """
429
        Notifies all observers that a schedule has terminated.
430
431
        :param schedule:
432
        :param int rst_id:
433
        """
434
        for method in Schedule._observers_schedule_termination:
435
            method(schedule, rst_id)
436
437
    # ------------------------------------------------------------------------------------------------------------------
438
    def _test_node_run_status_of_successor(self, rnd_id, statuses, seen):
439
        """
440
        :param int rnd_id:
441
        :param tuple statuses:
442
        :param seen:
443
444
        :rtype: bool
445
        """
446
        if rnd_id in self._children:
447
            # Node rnd_id is complex node.
448
            for node in self._children[rnd_id]:
449
                if node.rnd_id not in seen:
450
                    seen.add(node.rnd_id)
451
                    if self._test_node_run_status_of_successor(node.rnd_id, statuses, seen):
452
                        return True
453
454
        else:
455
            # Node rnd_id is a simple node.
456
            if self._nodes[rnd_id].get_rst_id() in statuses:
457
                return True
458
459
            if rnd_id in self._successors:
460
                for node in self._successors[rnd_id]:
461
                    if node.rnd_id not in seen:
462
                        seen.add(node.rnd_id)
463
                        if node.get_rst_id() in statuses:
464
                            return True
465
466
                        if self._test_node_run_status_of_successor(node.rnd_id, statuses, seen):
467
                            return True
468
469
        return False
470
471
    # ------------------------------------------------------------------------------------------------------------------
472
    def request_possible_node_actions(self, rnd_id, message):
473
        """
474
        Returns the possible actions for a node.
475
476
        :param int rnd_id: The ID of the node.
477
        :param dict message: The message containing possible actions and mail options.
478
479
        :rtype dict: The message with possible node actions enabled.
480
        """
481
        # Find node in map from rnd_id to node.
482
        node = self._nodes.get(rnd_id, None)
483
        if not node:
484
            # Node is not part of a current run of this schedule .
485
            return message
486
487
        # Set the mail options.
488
        message['mail_on_completion'] = self._mail_on_completion
489
        message['mail_on_error'] = self._mail_on_error
490
491
        # Get the current run status of the node.
492
        rst_id = node.get_rst_id()
493
494
        if self._schedule_node.rnd_id == rnd_id:
495
            # Node rnd_id is the schedule it self.
496
            if rst_id == enarksh.ENK_RST_ID_WAITING:
497
                message['actions'][enarksh.ENK_ACT_ID_TRIGGER]['act_enabled'] = True
498
                errors = self._test_node_run_status_of_successor(rnd_id, (enarksh.ENK_RST_ID_ERROR,), set())
499
                if errors:
500
                    message['actions'][enarksh.ENK_ACT_ID_RESTART_FAILED]['act_enabled'] = True
501 View Code Duplication
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
502
            elif rst_id == enarksh.ENK_RST_ID_QUEUED:
503
                # No actions are possible.
504
                pass
505
506
            elif rst_id == enarksh.ENK_RST_ID_RUNNING:
507
                errors = self._test_node_run_status_of_successor(rnd_id, (enarksh.ENK_RST_ID_ERROR,), set())
508
                if errors:
509
                    message['actions'][enarksh.ENK_ACT_ID_RESTART_FAILED]['act_enabled'] = True
510
511
            elif rst_id == enarksh.ENK_RST_ID_COMPLETED:
512
                message['actions'][enarksh.ENK_ACT_ID_TRIGGER]['act_enabled'] = True
513
514
            elif rst_id == enarksh.ENK_RST_ID_ERROR:
515
                message['actions'][enarksh.ENK_ACT_ID_TRIGGER]['act_enabled'] = True
516
                message['actions'][enarksh.ENK_ACT_ID_RESTART_FAILED]['act_enabled'] = True
517
518
            else:
519
                raise Exception("Unexpected rst_id '%s'." % rst_id)
520
521
            return message
522
523
        if self._activate_node.rnd_id == rnd_id:
524
            # Node rnd_id is the trigger of the schedule.
525
            busy = self._test_node_run_status_of_successor(rnd_id, (enarksh.ENK_RST_ID_RUNNING,
526
                                                                    enarksh.ENK_RST_ID_QUEUED), set())
527
            if not busy:
528
                message['actions'][enarksh.ENK_ACT_ID_TRIGGER]['act_enabled'] = True
529
530
            return message
531
532
        if self._arrest_node.rnd_id == rnd_id:
533
            # No actions are possible for an arrest node of a schedule.
534
            return message
535
536
        # Node is not an activate node nor an arrest node of the schedule.
537
        if rst_id == enarksh.ENK_RST_ID_WAITING:
538
            # Node is waiting for 1 or more predecessors are completed.
539
            # No actions are possible.
540
            pass
541
542
        elif rst_id == enarksh.ENK_RST_ID_QUEUED:
543
            # All predecessor are completed, but node is waiting for resources become available.
544
            # No actions are possible.
545
            pass
546
547
        elif rst_id == enarksh.ENK_RST_ID_RUNNING:
548
            # Node is running.
549
            # No actions are possible for simple nodes.
550
            if node.is_complex_node():
551
                errors = self._test_node_run_status_of_successor(rnd_id, (enarksh.ENK_RST_ID_ERROR,), set())
552
                if errors:
553
                    message['actions'][enarksh.ENK_ACT_ID_RESTART_FAILED]['act_enabled'] = True
554 View Code Duplication
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
555
        elif rst_id == enarksh.ENK_RST_ID_COMPLETED:
556
            # Node has completed successfully.
557
            busy = self._test_node_run_status_of_successor(rnd_id, (enarksh.ENK_RST_ID_RUNNING,), set())
558
            if not busy:
559
                message['actions'][enarksh.ENK_ACT_ID_RESTART]['act_enabled'] = True
560
561
        elif rst_id == enarksh.ENK_RST_ID_ERROR:
562
            if node.is_complex_node():
563
                message['actions'][enarksh.ENK_ACT_ID_RESTART]['act_enabled'] = True
564
                message['actions'][enarksh.ENK_ACT_ID_RESTART_FAILED]['act_enabled'] = True
565
566
            elif node.is_simple_node():
567
                message['actions'][enarksh.ENK_ACT_ID_RESTART]['act_enabled'] = True
568
569
            else:
570
                raise Exception('Internal error.')
571
572
        else:
573
            raise Exception("Unexpected rst_id '%d'." % rst_id)
574
575
        return message
576
577
    # ------------------------------------------------------------------------------------------------------------------
578
    def _node_action_restart(self, rnd_id):
579
        """
580
        Restarts a node.
581
582
        :param int rnd_id: The ID of the node.
583
584
        :rtype bool: True if the controller must reload the schedule. False otherwise.
585
        """
586
        # Find node in map from rnd_id to node.
587
        node = self._nodes.get(rnd_id, None)
588
        if not node:
589
            # Node is not part of a current run of a this schedule.
590
            return False
591
592
        node.restart()
593
594
        return False
595
596
    # ------------------------------------------------------------------------------------------------------------------
597
    def _node_action_trigger_schedule(self, rnd_id):
598
        """
599
        Triggers the schedule.
600
601
        :param int rnd_id:
602
603
        :rtype bool: True if the controller must reload the schedule. False otherwise.
604
        """
605
        run_id = DataLayer.enk_back_schedule_trigger(self._sch_id)
606
607
        if not run_id:
608
            node = self._nodes[rnd_id]
609
            node.set_rst_id(enarksh.ENK_RST_ID_QUEUED)
610
611
        return run_id
612
613
    # ------------------------------------------------------------------------------------------------------------------
614
    def _node_action_restart_failed(self, rnd_id):
615
        """
616
        Restarts a node.
617
618
        :param int rnd_id: The ID of the node.
619
620
        :rtype bool: False.
621
        """
622
        # Find node in map from rnd_id to node.
623
        node = self._nodes.get(rnd_id, None)
624
        if not node:
625
            # Node is not part of a current run of a this schedule.
626
            return False
627
628
        node.restart_failed()
629
630
        return False
631
632
    # ------------------------------------------------------------------------------------------------------------------
633
    def event_node_stop(self, rnd_id, exit_status):
634
        """
635
        :param int rnd_id:
636
        :param int exit_status:
637
638
        :rtype: None
639
        """
640
        # Find node in map from rnd_id to node.
641
        node = self._nodes.get(rnd_id, None)
642
        if not node:
643
            # Node is not part of a current run of a this schedule.
644
            return
645
646
        node.stop(exit_status)
647
648
    # ------------------------------------------------------------------------------------------------------------------
649
    def _send_mail_on_error(self, node):
650
        """
651
        Sends an email to the administrator that a simple node has failed.
652
653
        :param enarksh.Controller.Node.Node node: The node that has failed.
654
        """
655
        try:
656
            user = DataLayer.enk_back_get_user_info(self._usr_login)
657
658
            body = "Dear Enarksh user,"
659
            ""
0 ignored issues
show
Unused Code introduced by
This string statement has no effect and could be removed.
Loading history...
660
            "Job " + str(node.name) + " has run unsuccessfully."
0 ignored issues
show
Unused Code Bug introduced by
The expression (('Job ') + (str(node.na...s run unsuccessfully.') does not seem to have sideeffects and its result is not used.

If a expression has no sideeffects (any lasting effect after it has been called) and its return value is not used, this usually means that this code can be removed or that an assignment is missing.

Loading history...
661
            ""
0 ignored issues
show
Unused Code introduced by
This string statement has no effect and could be removed.
Loading history...
662
            "Greetings from Enarksh"
0 ignored issues
show
Unused Code introduced by
This string statement has no effect and could be removed.
Loading history...
663
            subject = "Job of schedule " + str(self._schedule_node.name) + "failed."
664
665
            msg = MIMEText(body)
666
            msg['Subject'] = subject
667
            msg['To'] = user['usr_email']
668
            msg['From'] = user['usr_email']
669
670
            # Send the message via our local SMTP server.
671
            s = smtplib.SMTP('localhost')
0 ignored issues
show
Coding Style Naming introduced by
The name s does not conform to the variable naming conventions ([a-z_][a-z0-9_]{1,60}$).

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
672
            s.send_message(msg)
673
            s.quit()
674
        except Exception as exception:
0 ignored issues
show
Best Practice introduced by
Catching very general exceptions such as Exception is usually not recommended.

Generally, you would want to handle very specific errors in the exception handler. This ensure that you do not hide other types of errors which should be fixed.

So, unless you specifically plan to handle any error, consider adding a more specific exception.

Loading history...
675
            print(exception, file=sys.stderr)
676
            traceback.print_exc(file=sys.stderr)
677
678
    # ------------------------------------------------------------------------------------------------------------------
679
    def _send_mail_on_completion(self):
680
        """
681
        Sends an email to the administrator that the schedule has completed.
682
        """
683
        try:
684
            user = DataLayer.enk_back_get_user_info(self._usr_login)
685
686
            if self._schedule_node.rst_id == enarksh.ENK_RST_ID_ERROR:
687
                body = "Dear Enarksh user,"
688
                ""
0 ignored issues
show
Unused Code introduced by
This string statement has no effect and could be removed.
Loading history...
689
                "Schedule " + str(self._schedule_node.name) + " has finished unsuccessfully."
0 ignored issues
show
Unused Code Bug introduced by
The expression (('Schedule ') + (str(se...ished unsuccessfully.') does not seem to have sideeffects and its result is not used.

If a expression has no sideeffects (any lasting effect after it has been called) and its return value is not used, this usually means that this code can be removed or that an assignment is missing.

Loading history...
690
                ""
0 ignored issues
show
Unused Code introduced by
This string statement has no effect and could be removed.
Loading history...
691
                "Greetings from Enarksh"
0 ignored issues
show
Unused Code introduced by
This string statement has no effect and could be removed.
Loading history...
692
                subject = "Schedule " + self._schedule_node.name + "finished unsuccessfully."
693
            else:
694
                body = "Dear Enarksh user,"
695
                ""
0 ignored issues
show
Unused Code introduced by
This string statement has no effect and could be removed.
Loading history...
696
                "Schedule " + str(self._schedule_node.name) + " has finished successfully."
0 ignored issues
show
Unused Code Bug introduced by
The expression (('Schedule ') + (str(se...inished successfully.') does not seem to have sideeffects and its result is not used.

If a expression has no sideeffects (any lasting effect after it has been called) and its return value is not used, this usually means that this code can be removed or that an assignment is missing.

Loading history...
697
                ""
0 ignored issues
show
Unused Code introduced by
This string statement has no effect and could be removed.
Loading history...
698
                "Greetings from Enarksh"
0 ignored issues
show
Unused Code introduced by
This string statement has no effect and could be removed.
Loading history...
699
                subject = "Schedule " + self._schedule_node.name + "finished successfully."
700
701
            msg = MIMEText(body)
702
            msg['Subject'] = subject
703
            msg['To'] = user['usr_email']
704
            msg['From'] = user['usr_email']
705
706
            # Send the message via our local SMTP server.
707
            s = smtplib.SMTP('localhost')
0 ignored issues
show
Coding Style Naming introduced by
The name s does not conform to the variable naming conventions ([a-z_][a-z0-9_]{1,60}$).

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
708
            s.send_message(msg)
709
            s.quit()
710
        except Exception as exception:
0 ignored issues
show
Best Practice introduced by
Catching very general exceptions such as Exception is usually not recommended.

Generally, you would want to handle very specific errors in the exception handler. This ensure that you do not hide other types of errors which should be fixed.

So, unless you specifically plan to handle any error, consider adding a more specific exception.

Loading history...
711
            print(exception, file=sys.stderr)
712
            traceback.print_exc(file=sys.stderr)
713
714
    # ------------------------------------------------------------------------------------------------------------------
715
    def slot_node_state_change(self, node, old, new):
716
        """
717
        :param enarksh.Controller.Node.Node node:
718
        :param dict old:
719
        :param dict new:
720
        """
721
        # If required: sync the status of the node to the database.
722
        if old['rst_id'] != new['rst_id']:
723
            node.sync_state()
724
725
        # If required: update the queue.
726
        if old['rst_id'] != new['rst_id'] and node.is_simple_node():
727
            if new['rst_id'] == enarksh.ENK_RST_ID_QUEUED:
728
                self._queue.add(node)
729
            elif old['rst_id'] == enarksh.ENK_RST_ID_QUEUED:
730
                self._queue.discard(node)
731
732
        # Adjust the schedule load (i.e. number of running nodes) of this schedule.
733
        if node.is_simple_node():
734
            if old['rst_id'] != new['rst_id']:
735
                if new['rst_id'] == enarksh.ENK_RST_ID_RUNNING:
736
                    self._schedule_load += 1
737
                if old['rst_id'] == enarksh.ENK_RST_ID_RUNNING:
738
                    self._schedule_load -= 1
739
740
        # Adjust all mappings from rnd_id.
741
        if old['rnd_id'] != new['rnd_id']:
742
            if old['rnd_id'] in self._children:
743
                self._children[new['rnd_id']] = self._children[old['rnd_id']]
744
                del self._children[old['rnd_id']]
745
746
            self._nodes[new['rnd_id']] = self._nodes[old['rnd_id']]
747
            del self._nodes[old['rnd_id']]
748
749
            if old['rnd_id'] in self._successors:
750
                self._successors[new['rnd_id']] = self._successors[old['rnd_id']]
751
                del self._successors[old['rnd_id']]
752
753
        # If a simple node has failed send mail to administrator.
754
        if node.is_simple_node() and old['rst_id'] != new['rst_id']:
755
            if new['rst_id'] == enarksh.ENK_RST_ID_ERROR and self._mail_on_error:
756
                self._send_mail_on_error(node)
757
758
        # If the schedule has finished send an mail to the administrator.
759
        if node == self._schedule_node and old['rst_id'] != new['rst_id']:
760
            if new['rst_id'] in (enarksh.ENK_RST_ID_ERROR, enarksh.ENK_RST_ID_COMPLETED):
761
                self._send_mail_on_completion()
762
763
        # If the schedule has terminated inform all observer of this event.
764
        if node == self._schedule_node and old['rst_id'] != new['rst_id']:
765
            if new['rst_id'] in (enarksh.ENK_RST_ID_ERROR, enarksh.ENK_RST_ID_COMPLETED):
766
                self._notify_observers_schedule_termination(self, new['rst_id'])
767
768
    # ------------------------------------------------------------------------------------------------------------------
769
    @staticmethod
770
    def slot_resource_state_change(resource, old, new):
0 ignored issues
show
Unused Code introduced by
The argument new seems to be unused.
Loading history...
Unused Code introduced by
The argument old seems to be unused.
Loading history...
771
        """
772
        :param enarksh.Controller.Resource resource:
773
        :param dict old:
774
        :param dict new:
775
        """
776
        resource.sync_state()
777
778
    # ------------------------------------------------------------------------------------------------------------------
779
    def get_schedule_load(self):
780
        """
781
        Returns the schedule load of this schedule. I.e. the number of nodes of ths schedule that are currently running.
782
783
        :rtype: int
784
        """
785
        return self._schedule_load
786
787
    # ------------------------------------------------------------------------------------------------------------------
788
    def get_activate_node(self):
789
        """
790
        Returns the node that is the activate node of this schedule.
791
792
        :rtype: enarksh.Controller.Node.Node
793
        """
794
        return self._activate_node
795
796
    # ------------------------------------------------------------------------------------------------------------------
797
    def request_node_action(self, rnd_id, act_id, usr_login, mail_on_completion, mail_on_error):
798
        """
799
        Executes a node action.
800
801
        :param int rnd_id: The ID of the node.
802
        :param int act_id: The ID of the action.
803
        :param str usr_login:
804
        :param bool mail_on_completion:
805
        :param bool mail_on_error:
806
807
        :rtype bool: True if the controller must reload the schedule. False otherwise.
808
        """
809
        # Store the mail options.
810
        self._usr_login = usr_login
811
        self._mail_on_completion = mail_on_completion
812
        self._mail_on_error = mail_on_error
813
814
        if self._schedule_node.rnd_id == rnd_id:
815
            # Node is the schedule is self.
816
            if act_id == enarksh.ENK_ACT_ID_TRIGGER:
817
                return self._node_action_trigger_schedule(rnd_id)
818
819
            if act_id == enarksh.ENK_ACT_ID_RESTART_FAILED:
820
                return self._node_action_restart_failed(rnd_id)
821
822
            raise Exception("Unknown or unsupported act_id '%s'." % act_id)
823
824
        if self._activate_node.rnd_id == rnd_id:
825
            # Node is the activate node of the schedule.
826
            if act_id == enarksh.ENK_ACT_ID_TRIGGER:
827
                return self._node_action_trigger_schedule(rnd_id)
828
829
            raise Exception("Unknown or unsupported act_id '%s'." % act_id)
830
831
        if self._arrest_node.rnd_id == rnd_id:
832
            # Node is the arrest node of the schedule. No actions are possible.
833
            raise Exception("Unknown or unsupported act_id '%s'." % act_id)
834
835
        # Node is a "normal" node in the schedule.
836
        if act_id == enarksh.ENK_ACT_ID_RESTART:
837
            return self._node_action_restart(rnd_id)
838
839
        if act_id == enarksh.ENK_ACT_ID_RESTART_FAILED:
840
            return self._node_action_restart_failed(rnd_id)
841
842
        raise Exception("Unknown or unsupported act_id '%s'." % act_id)
843
844
    # ------------------------------------------------------------------------------------------------------------------
845
    @staticmethod
846
    def queue_compare(node1, node2):
847
        """
848
        Compares two nodes for sorting queued nodes.
849
850
        :param enarksh.Controller.Node.Node node1:
851
        :param enarksh.Controller.Node.Node node2:
852
853
        :rtype: int
854
        """
855
        # Sort by scheduling weight.
856
        cmp = node1.get_schedule_wait() - node2.get_schedule_wait()
857
858
        if cmp == 0:
859
            if node1.get_name().lower() < node2.get_name().lower():
860
                cmp = -1
861
            elif node1.get_name().lower() > node2.get_name().lower():
862
                cmp = 1
863
            else:
864
                cmp = 0
865
866
        return cmp
867
868
    # ------------------------------------------------------------------------------------------------------------------
869
    def get_queue(self):
870
        """
871
        Returns the queued nodes sorted by scheduling wait.
872
873
        :rtype: set
874
        """
875
        return sorted(self._queue, key=functools.cmp_to_key(Schedule.queue_compare))
876
877
878
# ----------------------------------------------------------------------------------------------------------------------
879