GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.

Node.predecessor_node_event_state_change_handler()   A
last analyzed

Complexity

Conditions 2

Size

Total Lines 14

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 6

Importance

Changes 0
Metric Value
cc 2
c 0
b 0
f 0
dl 0
loc 14
ccs 0
cts 5
cp 0
crap 6
rs 9.4285
1
"""
2
Enarksh
3
4
Copyright 2013-2016 Set Based IT Consultancy
5
6
Licence MIT
7
"""
8
import abc
9
import logging
10
from time import strftime, localtime
11
12
from enarksh.C import C
13
from enarksh.DataLayer import DataLayer
14
from enarksh.controller.StateChange import StateChange
15
16
17
class Node(StateChange, metaclass=abc.ABCMeta):
18
    """
19
    Abstract class for objects in the controller of type 'Node'.
20
    """
21
    _rst_id_weight = {C.ENK_RST_ID_RUNNING:   5,
22
                      C.ENK_RST_ID_QUEUED:    4,
23
                      C.ENK_RST_ID_ERROR:     3,
24
                      C.ENK_RST_ID_WAITING:   2,
25
                      C.ENK_RST_ID_COMPLETED: 1}
26
27
    _weight_rst_id = {5: C.ENK_RST_ID_RUNNING,
28
                      4: C.ENK_RST_ID_QUEUED,
29
                      3: C.ENK_RST_ID_ERROR,
30
                      2: C.ENK_RST_ID_WAITING,
31
                      1: C.ENK_RST_ID_COMPLETED}
32
33
    event_new_node_creation = None
34
    """
35
    The event that wil be fired when a new node has been created.
36
37
    :type: enarksh.event.Event.Event
38
    """
39
40
    # ------------------------------------------------------------------------------------------------------------------
41
    def __init__(self, node_data):
42
        """
43
        Object constructor
44
45
        :param dict node_data:
46
        """
47
        StateChange.__init__(self)
48
49
        self._rnd_id = node_data['rnd_id']
50
        """
51
        The ID of this (run) node.
52
53
        :type: int
54
        """
55
56
        self._node_name = str(node_data['nod_name'], 'utf-8')  # @todo XXX DataLayer encoding issue
57
        """
58
        The name of this node.
59
60
        :type: str
61
        """
62
63
        self._rst_id = node_data['rst_id']
64
        """
65
        The ID of the run status of this node.
66
67
        :type: int
68
        """
69
70
        self._rnd_datetime_start = node_data['rnd_datetime_start']
71
        """
72
        The epoch this node has been started.
73
74
        :type:
75
        """
76
77
        self._rnd_datetime_stop = node_data['rnd_datetime_stop']
78
        """
79
        The epoch this node has finished.
80
81
        :type:
82
        """
83
84
        self._exit_status = None
85
        """
86
        The exit status of the job of this node.
87
88
        :type: None|bool
89
        """
90
91
        self.consumptions = []
92
        """
93
        The consumptions of this node.
94
95
        :type: list[enarksh.controller.consumption.Consumption.Consumption]
96
        """
97
98
        self.resources = []
99
        """
100
        The resources of this node.
101
102
        :type: list[enarksh.controller.resource.Resource.Resource]
103
        """
104
105
        self._scheduling_weight = 0
106
        """
107
        The weight of this node to be taken into account when sorting queued nodes.
108
109
        :type: int
110
        """
111
112
        self._parent_node = None
113
        """
114
        The parent node of this node.
115
116
        :type: enarksh.controller.node.Node.Node|None
117
        """
118
119
        self._child_nodes = []
120
        """
121
        The child nodes of this node. This list is empty for simple nodes.
122
123
        :type: list[enarksh.controller.node.Node.Node]
124
        """
125
126
        self._predecessor_nodes = []
127
        """
128
        The direct (simple) predecessor nodes of this node. This list is empty for complex nodes.
129
130
        :type: list[enarksh.controller.node.Node.Node]
131
        """
132
133
        self._successor_nodes = []
134
        """
135
        The direct (simple) successor nodes of this node. This list is empty for complex nodes.
136
137
        :type: list[enarksh.controller.node.Node.Node]
138
        """
139
140
        Node.event_new_node_creation.fire(self)
141
142
    # ------------------------------------------------------------------------------------------------------------------
143
    @property
144
    def name(self):
145
        """
146
        Getter for name. Returns the name of this node.
147
148
        :rtype: str
149
        """
150
        return self._node_name
151
152
    # ------------------------------------------------------------------------------------------------------------------
153
    @property
154
    def rnd_id(self):
155
        """
156
        Getter for rnd_id. Returns the ID of this node.
157
158
        :rtype: int
159
        """
160
        return self._rnd_id
161
162
    # ------------------------------------------------------------------------------------------------------------------
163
    @property
164
    def rst_id(self):
165
        """
166
        Getter for rst_id. Returns the ID of the run status of this node.
167
168
        :rtype: int
169
        """
170
        return self._rst_id
171
172
    # ------------------------------------------------------------------------------------------------------------------
173
    @StateChange.wrapper
174
    def __rst_id_wrapper(self, rst_id):
175
        """
176
        Setter for rst_id. Sets the the run status of this node.
177
178
        :param int rst_id: The ID of the run status.
179
        """
180
        self._rst_id = rst_id
181
182
    # ------------------------------------------------------------------------------------------------------------------
183
    @rst_id.setter
184
    def rst_id(self, rst_id):
185
        """
186
        Sets the run status of this node.
187
188
        :param int rst_id: The new run status for this node.
189
        """
190
        old_rst_id = self.rst_id
191
        self.__rst_id_wrapper(rst_id)
192
193
        # Update the start datetime of this node.
194
        if rst_id == C.ENK_RST_ID_RUNNING:
195
            if not self._rnd_datetime_start:
196
                self._rnd_datetime_start = strftime("%Y-%m-%d %H:%M:%S", localtime())
197
            self._rnd_datetime_stop = None
198
199
        # Update the stop datetime of this node.
200
        if old_rst_id != rst_id and rst_id in (C.ENK_RST_ID_COMPLETED, C.ENK_RST_ID_ERROR):
201
            self._rnd_datetime_stop = strftime("%Y-%m-%d %H:%M:%S", localtime())
202
203
    # ------------------------------------------------------------------------------------------------------------------
204
    @property
205
    def scheduling_weight(self):
206
        """
207
        Return the scheduling weight (i.e. the number (direct and indirect) of simple successors).
208
209
        :rtype: int
210
        """
211
        return self._scheduling_weight
212
213
    # ------------------------------------------------------------------------------------------------------------------
214
    def get_state_attributes(self):
215
        """
216
        :rtype: dict[str,int]
217
        """
218
        return {'rnd_id': self.rnd_id,
219
                'rst_id': self.rst_id}
220
221
    # ------------------------------------------------------------------------------------------------------------------
222
    def __del__(self):
223
        """
224
        Object destructor.
225
        """
226
        log = logging.getLogger('enarksh')
227
        log.debug('Deleting node: rst_id: {}'.format(self._rst_id))
228
229
    # ------------------------------------------------------------------------------------------------------------------
230
    def acquire_resources(self):
231
        """
232
        Acquires the resources required by this node.
233
        """
234
        for consumption in self.consumptions:
235
            consumption.acquire_resource()
236
237
    # ------------------------------------------------------------------------------------------------------------------
238
    def inquire_resources(self):
239
        """
240
        Returns true when there enough resources available to start this node. Otherwise returns false.
241
242
        :rtype: bool
243
        """
244
        ret = True
245
246
        for consumption in self.consumptions:
247
            ret = consumption.inquire_resource()
248
            if not ret:
249
                break
250
251
        return ret
252
253
    # ------------------------------------------------------------------------------------------------------------------
254
    def release_resources(self):
255
        """
256
        Releases the resources required by this node.
257
        """
258
        for consumption in self.consumptions:
259
            consumption.release_resource()
260
261
    # ------------------------------------------------------------------------------------------------------------------
262
    def _recompute_run_status(self):
263
        """
264
        Recomputes the run status of that node based on the run statuses of the predecessor nodes of this node.
265
        """
266
        if self._predecessor_nodes:
267
            count_not_completed = 0
268
            count_not_finished = 0
269
            for predecessor in self._predecessor_nodes:
270
                if predecessor.rst_id != C.ENK_RST_ID_COMPLETED:
271
                    count_not_completed += 1
272
                if predecessor.rst_id != C.ENK_RST_ID_COMPLETED \
273
                        and predecessor.rst_id != C.ENK_RST_ID_ERROR:
274
                    count_not_finished += 1
275
276
            if count_not_completed == 0:
277
                # All predecessors have run status completed.
278
                self._renew()
279
                self.rst_id = C.ENK_RST_ID_QUEUED
280
281
            if count_not_finished != 0 and self.rst_id != C.ENK_RST_ID_WAITING:
282
                # A predecessors is been restarted.
283
                self._renew()
284
                self.rst_id = C.ENK_RST_ID_WAITING
285
286
    # ------------------------------------------------------------------------------------------------------------------
287
    def child_node_event_state_change_handler(self, _event, _event_data, _listener_data):
288
        """
289
        Event handler for a sate change of a child node.
290
291
        :param * _event: Not used.
292
        :param * _event_data: The old and new state.
293
        :param * _listener_data: Not used
294
        """
295
        del _event, _event_data, _listener_data
296
297
        # Compute the running status of this complex node based on the running statuses of its child nodes.
298
        weight = 0
299
        for child_node in self._child_nodes:
300
            weight = max(weight, Node._rst_id_weight[child_node.rst_id])
301
302
        # Update the run status of this node.
303
        self.rst_id = self._weight_rst_id[weight]
304
305
    # ------------------------------------------------------------------------------------------------------------------
306
    def predecessor_node_event_state_change_handler(self, _event, event_data, _listener_data):
307
        """
308
        Event handler for a sate change of a predecessor node.
309
310
        :param * _event: Not used.
311
        :param tuple[dict,dict] event_data: The old and new state.
312
        :param * _listener_data: Not used
313
        """
314
        del _event, _listener_data
315
316
        old, new = event_data
317
318
        if old['rst_id'] != new['rst_id']:
319
            self._recompute_run_status()
320
321
    # ------------------------------------------------------------------------------------------------------------------
322
    @StateChange.wrapper
323
    def _renew(self):
324
        """
325
        If required renews this node, i.e. creates a new row in ENK_RUN_NODE.
326
        """
327
        if self._rst_id in (C.ENK_RST_ID_ERROR, C.ENK_RST_ID_COMPLETED):
328
            self._rnd_id = DataLayer.enk_back_run_node_renew(self.rnd_id)
329
            self._rst_id = C.ENK_RST_ID_WAITING
330
            self._rnd_datetime_start = None
331
            self._rnd_datetime_stop = None
332
            self._exit_status = None
333
334
    # ------------------------------------------------------------------------------------------------------------------
335
    def initialize(self,
336
                   node_data,
337
                   schedule,
0 ignored issues
show
Unused Code introduced by
The argument schedule seems to be unused.
Loading history...
338
                   resources,
339
                   resources_data,
340
                   consumptions,
341
                   consumptions_data,
342
                   run_nodes,
343
                   child_nodes,
344
                   direct_predecessors,
345
                   direct_successors,
346
                   successors):
347
        """
348
        Initializes this node.
349
350
        :param dict node_data:
351
        :param dict schedule:
352
        :param dict resources:
353
        :param dict resources_data:
354
        :param dict consumptions:
355
        :param dict consumptions_data:
356
        :param dict run_nodes:
357
        :param dict child_nodes:
358
        :param dict direct_predecessors:
359
        :param dict direct_successors:
360
        :param dict successors:
361
        """
362
        # Initialize the resources of this node.
363
        if self.rnd_id in resources_data:
364
            for resource_data in resources_data[self.rnd_id]:
365
                self.resources.append(resources[resource_data['rsc_id']])
366
367
        # Initialize the consumptions of this node.
368
        if self.rnd_id in consumptions_data:
369
            for consumption_data in consumptions_data[self.rnd_id]:
370
                self.consumptions.append(consumptions[consumption_data['cns_id']])
371
372
        # Observe all direct predecessor nodes of this node (for simple nodes only) and initialize predecessor state
373
        # count.
374
        if self.rnd_id in direct_predecessors:
375
            for predecessor in direct_predecessors[self.rnd_id]:
376
                node = run_nodes[predecessor]
377
                node.event_state_change.register_listener(self.predecessor_node_event_state_change_handler)
378
                self._predecessor_nodes.append(node)
379
380
        # Observe the child run_nodes of this node (for complex nodes only).
381
        if self.rnd_id in child_nodes:
382
            for child_node in child_nodes[self.rnd_id]:
383
                node = run_nodes[child_node['rnd_id']]
384
                node.event_state_change.register_listener(self.child_node_event_state_change_handler)
385
                self._child_nodes.append(node)
386
387
        # Set the parent node of this node.
388
        if node_data['rnd_id_parent']:
389
            self._parent_node = run_nodes[node_data['rnd_id_parent']]
390
391
        #
392
        if self.rnd_id in direct_successors:
393
            for successor in direct_successors[self.rnd_id]:
394
                node = run_nodes[successor]
395
                self._successor_nodes.append(node)
396
397
        # Set scheduling weight, i.e. the number of (direct and indirect) successors.
398
        if self.rnd_id in successors:
399
            self._scheduling_weight = len(successors[self.rnd_id])
400
401
    # ------------------------------------------------------------------------------------------------------------------
402
    def get_start_message(self, sch_id):
403
        """
404
        Returns the message to be send to the spawner for starting this node. Raises an exception if this node can not
405
        be started by the spawner (e.g. a complex node).
406
407
        :param int sch_id: The ID of the schedule.
408
409
        :rtype: enarksh.message.Message.Message
410
        """
411
        raise RuntimeError("Node of class '{}' can not be started by the spawner".format(self.__class__))
412
413
    # ------------------------------------------------------------------------------------------------------------------
414
    def start(self):
415
        """
416
        Does the housekeeping for starting this node. Returns True if an actual job must be started by the spawner.
417
        Returns False otherwise.
418
419
        :rtype: bool
420
        """
421
        raise RuntimeError("Node of class '{}' can not be marked started".format(self.__class__))
422
423
    # ------------------------------------------------------------------------------------------------------------------
424
    def stop(self, exit_status):
425
        """
426
        Does the housekeeping when the node has stopped.
427
428
        :param int exit_status: The exits status of the job.
429
        """
430
        raise RuntimeError("Node of class '{}' can not be marked stopped".format(self.__class__))
431
432
    # ------------------------------------------------------------------------------------------------------------------
433
    @abc.abstractmethod
434
    def restart(self):
435
        """
436
        Restarts this node.
437
        """
438
        raise NotImplementedError()
439
440
    # ------------------------------------------------------------------------------------------------------------------
441
    @abc.abstractmethod
442
    def restart_failed(self):
443
        """
444
        Restarts all failed simple nodes.
445
        """
446
        raise NotImplementedError()
447
448
    # ------------------------------------------------------------------------------------------------------------------
449
    def fake_get_resource_by_name(self, name):
450
        """
451
        Returns a resource.
452
453
        :param str name: The name of the requested resource.
454
455
        :rtype: enarksh.controller.resource.Resource.Resource|None
456
        """
457
        for resource in self.resources:
458
            if resource.get_name() == name:
459
                return resource
460
461
        if self._parent_node:
462
            return self._parent_node.fake_get_resource_by_name(name)
463
464
        return None
465
466
    # ------------------------------------------------------------------------------------------------------------------
467
    def sync_state(self):
468
        """
469
        Updates the state of this node into the database.
470
        """
471
        DataLayer.enk_back_run_node_update_status(self.rnd_id,
472
                                                  self.rst_id,
473
                                                  self._rnd_datetime_start,
474
                                                  self._rnd_datetime_stop,
475
                                                  self._exit_status)
476
477
    # ------------------------------------------------------------------------------------------------------------------
478
    def get_uri(self, obj_type='node'):
479
        """
480
        Returns the URI of this node.
481
482
        :param str obj_type: The entity type.
483
484
        :rtype: str
485
        """
486
        if self._parent_node:
487
            uri = self._parent_node.get_uri(obj_type)
488
        else:
489
            uri = '//' + obj_type
490
491
        return uri + '/' + self._node_name
492
493
    # ------------------------------------------------------------------------------------------------------------------
494
    @abc.abstractmethod
495
    def is_simple_node(self):
496
        """
497
        Returns True if this node is a simple node. Otherwise, returns False.
498
499
        :rtype: bool
500
        """
501
        raise NotImplementedError()
502
503
    # ------------------------------------------------------------------------------------------------------------------
504
    @abc.abstractmethod
505
    def is_complex_node(self):
506
        """
507
        Returns True if this node is a complex node. Otherwise, returns False.
508
509
        :rtype: bool
510
        """
511
        raise NotImplementedError()
512
513
# ----------------------------------------------------------------------------------------------------------------------
514