GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Passed
Push — master ( 83e50d...75f367 )
by P.R.
02:59
created

Node.scheduling_weight()   A

Complexity

Conditions 1

Size

Total Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 1
Bugs 1 Features 0
Metric Value
cc 1
c 1
b 1
f 0
dl 0
loc 8
rs 9.4285
ccs 0
cts 2
cp 0
crap 2
1
"""
2
Enarksh
3
4
Copyright 2013-2016 Set Based IT Consultancy
5
6
Licence MIT
7
"""
8
import abc
9
from time import strftime, gmtime
10
11
import enarksh
12
from enarksh.DataLayer import DataLayer
13
from enarksh.controller.StateChange import StateChange
14
15
16
class Node(StateChange, metaclass=abc.ABCMeta):
17
    """
18
    Abstract class for objects in the controller of type 'Node'.
19
    """
20
    _rst_id_weight = {enarksh.ENK_RST_ID_RUNNING:   5,
21
                      enarksh.ENK_RST_ID_QUEUED:    4,
22
                      enarksh.ENK_RST_ID_ERROR:     3,
23
                      enarksh.ENK_RST_ID_WAITING:   2,
24
                      enarksh.ENK_RST_ID_COMPLETED: 1}
25
26
    _weight_rst_id = {5: enarksh.ENK_RST_ID_RUNNING,
27
                      4: enarksh.ENK_RST_ID_QUEUED,
28
                      3: enarksh.ENK_RST_ID_ERROR,
29
                      2: enarksh.ENK_RST_ID_WAITING,
30
                      1: enarksh.ENK_RST_ID_COMPLETED}
31
32
    # ------------------------------------------------------------------------------------------------------------------
33
    def __init__(self, node_data):
34
        """
35
        Object constructor
36
37
        :param dict node_data:
38
        """
39
        StateChange.__init__(self)
40
41
        self._rnd_id = node_data['rnd_id']
42
        """
43
        The ID of this (run) node.
44
45
        :type: int
46
        """
47
48
        self._node_name = str(node_data['nod_name'], 'utf-8')  # @todo XXX DataLayer encoding issue
49
        """
50
        The name of this node.
51
52
        :type: str
53
        """
54
55
        self._rst_id = node_data['rst_id']
56
        """
57
        The ID of the run status of this node.
58
59
        :type: int
60
        """
61
62
        self._rnd_datetime_start = node_data['rnd_datetime_start']
63
        """
64
        The epoch this node has been started.
65
66
        :type:
67
        """
68
69
        self._rnd_datetime_stop = node_data['rnd_datetime_stop']
70
        """
71
        The epoch this node has finished.
72
73
        :type:
74
        """
75
76
        self._exit_status = None
77
        """
78
        The exit status of the job of this node.
79
80
        :type: None|bool
81
        """
82
83
        self.consumptions = []
84
        """
85
        The consumptions of this node.
86
87
        :type: list[enarksh.controller.consumption.Consumption.Consumption]
88
        """
89
90
        self.resources = []
91
        """
92
        The resources of this node.
93
94
        :type: list[enarksh.controller.resource.Resource.Resource]
95
        """
96
97
        self._scheduling_weight = 0
98
        """
99
        The weight of this node to be taken into account when sorting queued nodes.
100
101
        :type: int
102
        """
103
104
        self._parent_node = None
105
        """
106
        The parent node of this node.
107
108
        :type: None|enarksh.controller.node.Node.Node
109
        """
110
111
        self._child_nodes = []
112
        """
113
        The child nodes of this node. This list is empty for simple nodes.
114
115
        :type: list[enarksh.controller.node.Node.Node]
116
        """
117
118
        self._predecessor_nodes = []
119
        """
120
        The direct (simple) predecessor nodes of this node. This list is empty for complex nodes.
121
122
        :type: list[enarksh.controller.node.Node.Node]
123
        """
124
125
        self._successor_nodes = []
126
        """
127
        The direct (simple) successor nodes of this node. This list is empty for complex nodes.
128
129
        :type: list[enarksh.controller.node.Node.Node]
130
        """
131
132
    # ------------------------------------------------------------------------------------------------------------------
133
    @property
134
    def name(self):
135
        """
136
        Getter for name. Returns the name of this node.
137
138
        :rtype: str
139
        """
140
        return self._node_name
141
142
    # ------------------------------------------------------------------------------------------------------------------
143
    @property
144
    def rnd_id(self):
145
        """
146
        Getter for rnd_id. Returns the ID of this node.
147
148
        :rtype: int
149
        """
150
        return self._rnd_id
151
152
    # ------------------------------------------------------------------------------------------------------------------
153
    @property
154
    def rst_id(self):
155
        """
156
        Getter for rst_id. Returns the ID of the run status of this node.
157
158
        :rtype: int
159
        """
160
        return self._rst_id
161
162
    # ------------------------------------------------------------------------------------------------------------------
163
    @StateChange.wrapper
164
    def _rst_id_wrapper(self, rst_id):  # XXX Hack!
165
        """
166
        Setter for rst_id. Sets the the run status of this node.
167
168
        :param int rst_id: The ID of the run status.
169
        """
170
        self._rst_id = rst_id
171
172
    # ------------------------------------------------------------------------------------------------------------------
173
    @rst_id.setter
174
    def rst_id(self, rst_id):
175
        """
176
        Setter for rst_id. Sets the the run status of this node.
177
178
        :param int rst_id: The ID of the run status.
179
        """
180
        self._rst_id_wrapper(rst_id)
181
182
    # ------------------------------------------------------------------------------------------------------------------
183
    @property
184
    def scheduling_weight(self):
185
        """
186
        Return the scheduling weight (i.e. the number (direct and indirect) of simple successors).
187
188
        :rtype: int
189
        """
190
        return self._scheduling_weight
191
192
    # ------------------------------------------------------------------------------------------------------------------
193
    def get_state_attributes(self):
194
        """
195
        :rtype: dict[str,int]
196
        """
197
        return {'rnd_id': self.rnd_id,
198
                'rst_id': self.rst_id}
199
200
    # ------------------------------------------------------------------------------------------------------------------
201
    def __del__(self):
202
        # print("Deleting node %s" % self.rnd_id)
203
        pass
204
205
    # ------------------------------------------------------------------------------------------------------------------
206
    def acquire_resources(self):
207
        """
208
        Acquires the resources required by this node.
209
        """
210
        for consumption in self.consumptions:
211
            consumption.acquire_resource()
212
213
    # ------------------------------------------------------------------------------------------------------------------
214
    def inquire_resources(self):
215
        """
216
        Returns true when there enough resources available to start this node. Otherwise returns false.
217
218
        :rtype: bool
219
        """
220
        ret = True
221
222
        for consumption in self.consumptions:
223
            ret = consumption.inquire_resource()
224
            if not ret:
225
                break
226
227
        return ret
228
229
    # ------------------------------------------------------------------------------------------------------------------
230
    def release_resources(self):
231
        """
232
        Releases the resources required by this node.
233
        """
234
        for consumption in self.consumptions:
235
            consumption.release_resource()
236
237
    # ------------------------------------------------------------------------------------------------------------------
238
    def _recompute_run_status(self):
0 ignored issues
show
Coding Style introduced by
This method should have a docstring.

The coding style of this project requires that you add a docstring to this code element. Below, you find an example for methods:

class SomeClass:
    def some_method(self):
        """Do x and return foo."""

If you would like to know more about docstrings, we recommend to read PEP-257: Docstring Conventions.

Loading history...
239
        if self._predecessor_nodes:
240
            count_not_completed = 0
241
            count_not_finished = 0
242
            for predecessor in self._predecessor_nodes:
243
                if predecessor.rst_id != enarksh.ENK_RST_ID_COMPLETED:
244
                    count_not_completed += 1
245
                if predecessor.rst_id != enarksh.ENK_RST_ID_COMPLETED \
246
                        and predecessor.rst_id != enarksh.ENK_RST_ID_ERROR:
247
                    count_not_finished += 1
248
249
            if count_not_completed == 0:
250
                # All predecessors have run status completed.
251
                self._renew()
252
                self._set_rst_id(enarksh.ENK_RST_ID_QUEUED)
253
254
            if count_not_finished != 0 and self.rst_id != enarksh.ENK_RST_ID_WAITING:
255
                # A predecessors is been restarted.
256
                self._renew()
257
                self._set_rst_id(enarksh.ENK_RST_ID_WAITING)
258
259
    # ------------------------------------------------------------------------------------------------------------------
260
    def _set_rst_id(self, rst_id):
261
        """
262
        Sets the run status of this node.
263
264
        :param int rst_id: The new run status for this node.
265
        """
266
        old_rst_id = self.rst_id
267
        self._rst_id = rst_id
268
269
        # Update the start datetime of this node.
270
        if rst_id == enarksh.ENK_RST_ID_RUNNING:
271
            if not self._rnd_datetime_start:
272
                self._rnd_datetime_start = strftime("%Y-%m-%d %H:%M:%S", gmtime())
273
            self._rnd_datetime_stop = None
274
275
        # Update the stop datetime of this node.
276
        if old_rst_id != rst_id and rst_id in (enarksh.ENK_RST_ID_COMPLETED, enarksh.ENK_RST_ID_ERROR):
277
            self._rnd_datetime_stop = strftime("%Y-%m-%d %H:%M:%S", gmtime())
278
279
    # ------------------------------------------------------------------------------------------------------------------
280
    @StateChange.wrapper
281
    def child_node_event_state_change_handler(self, _event, _event_data, _listener_data):
282
        """
283
        Event handler for a sate change of a child node.
284
285
        :param * _event: Not used.
286
        :param * _event_data: The old and new state.
287
        :param * _listener_data: Not used
288
        """
289
        del _event, _event_data, _listener_data
290
291
        # Compute the running status of this complex node based on the running statuses of its child nodes.
292
        weight = 0
293
        for child_node in self._child_nodes:
294
            weight = max(weight, Node._rst_id_weight[child_node.rst_id])
295
296
        # Update the run status of this node.
297
        self._set_rst_id(self._weight_rst_id[weight])
298
299
    # ------------------------------------------------------------------------------------------------------------------
300
    @StateChange.wrapper
301
    def predecessor_node_event_state_change_handler(self, _event, event_data, _listener_data):
302
        """
303
        Event handler for a sate change of a predecessor node.
304
305
        :param * _event: Not used.
306
        :param tuple[dict,dict] event_data: The old and new state.
307
        :param * _listener_data: Not used
308
        """
309
        del _event, _listener_data
310
311
        old, new = event_data
312
313
        if old['rst_id'] != new['rst_id']:
314
            self._recompute_run_status()
315
316
    # ------------------------------------------------------------------------------------------------------------------
317
    def _renew(self):
318
        """
319
        If required renews this node, i.e. creates a new row in ENK_RUN_NODE.
320
        """
321
        if self.rst_id in (enarksh.ENK_RST_ID_ERROR, enarksh.ENK_RST_ID_COMPLETED):
322
            self._rnd_id = DataLayer.enk_back_run_node_renew(self.rnd_id)
323
            self._rst_id = enarksh.ENK_RST_ID_WAITING
324
            self._rnd_datetime_start = None
325
            self._rnd_datetime_stop = None
326
            self._exit_status = None
327
328
    # ------------------------------------------------------------------------------------------------------------------
329
    def initialize(self,
330
                   node_data,
331
                   schedule,
0 ignored issues
show
Unused Code introduced by
The argument schedule seems to be unused.
Loading history...
332
                   resources,
333
                   resources_data,
334
                   consumptions,
335
                   consumptions_data,
336
                   run_nodes,
337
                   child_nodes,
338
                   direct_predecessors,
339
                   direct_successors,
340
                   successors):
341
        """
342
        :param dict node_data:
343
        :param dict schedule:
344
        :param dict resources:
345
        :param dict resources_data:
346
        :param dict consumptions:
347
        :param dict consumptions_data:
348
        :param dict run_nodes:
349
        :param dict child_nodes:
350
        :param dict direct_predecessors:
351
        :param dict direct_successors:
352
        :param dict successors:
353
        """
354
        # Initialize the resources of this node.
355
        if self.rnd_id in resources_data:
356
            for resource_data in resources_data[self.rnd_id]:
357
                self.resources.append(resources[resource_data['rsc_id']])
358
359
        # Initialize the consumptions of this node.
360
        if self.rnd_id in consumptions_data:
361
            for consumption_data in consumptions_data[self.rnd_id]:
362
                self.consumptions.append(consumptions[consumption_data['cns_id']])
363
364
        # Observe all direct predecessor nodes of this node (for simple nodes only) and initialize predecessor state
365
        # count.
366
        if self.rnd_id in direct_predecessors:
367
            for predecessor in direct_predecessors[self.rnd_id]:
368
                node = run_nodes[predecessor]
369
                node.event_state_change.register_listener(self.predecessor_node_event_state_change_handler)
370
                self._predecessor_nodes.append(node)
371
372
        # Observe the child run_nodes of this node (for complex nodes only).
373
        if self.rnd_id in child_nodes:
374
            for child_node in child_nodes[self.rnd_id]:
375
                node = run_nodes[child_node['rnd_id']]
376
                node.event_state_change.register_listener(self.child_node_event_state_change_handler)
377
                self._child_nodes.append(node)
378
379
        # Set the parent node of this node.
380
        if node_data['rnd_id_parent']:
381
            self._parent_node = run_nodes[node_data['rnd_id_parent']]
382
383
        #
384
        if self.rnd_id in direct_successors:
385
            for successor in direct_successors[self.rnd_id]:
386
                node = run_nodes[successor]
387
                self._successor_nodes.append(node)
388
389
        # Set scheduling weight, i.e. the number of (direct and indirect) successors.
390
        if self.rnd_id in successors:
391
            self._scheduling_weight = len(successors[self.rnd_id])
392
393
    # ------------------------------------------------------------------------------------------------------------------
394
    # @abc.abstractmethod
395
    def get_start_message(self, sch_id):
0 ignored issues
show
Coding Style introduced by
This method should have a docstring.

The coding style of this project requires that you add a docstring to this code element. Below, you find an example for methods:

class SomeClass:
    def some_method(self):
        """Do x and return foo."""

If you would like to know more about docstrings, we recommend to read PEP-257: Docstring Conventions.

Loading history...
396
        raise NotImplementedError()
397
398
    # ------------------------------------------------------------------------------------------------------------------
399
    @abc.abstractmethod
400
    def restart(self):
401
        """
402
        Restarts this node.
403
        """
404
        raise NotImplementedError()
405
406
    # ------------------------------------------------------------------------------------------------------------------
407
    @abc.abstractmethod
408
    def restart_failed(self):
409
        """
410
        Restarts all failed simple nodes.
411
        """
412
        raise NotImplementedError()
413
414
    # ------------------------------------------------------------------------------------------------------------------
415
    @StateChange.wrapper
416
    def start(self):
417
        """
418
        :rtype: bool
419
        """
420
        # Acquire the required resources of this node.
421
        self.acquire_resources()
422
423
        # Set the status of this node to running.
424
        self._set_rst_id(enarksh.ENK_RST_ID_RUNNING)
425
426
        return True
427
428
    # ------------------------------------------------------------------------------------------------------------------
429
    @StateChange.wrapper
430
    def stop(self, exit_status):
0 ignored issues
show
Coding Style introduced by
This method should have a docstring.

The coding style of this project requires that you add a docstring to this code element. Below, you find an example for methods:

class SomeClass:
    def some_method(self):
        """Do x and return foo."""

If you would like to know more about docstrings, we recommend to read PEP-257: Docstring Conventions.

Loading history...
431
        # Release all by this node consumed resources.
432
        self.release_resources()
433
434
        # Save the exit status of the job.
435
        self._exit_status = exit_status
436
437
        # Update the run status of this node based on the exit status of the job.
438
        if exit_status == 0:
439
            self._set_rst_id(enarksh.ENK_RST_ID_COMPLETED)
440
        else:
441
            self._set_rst_id(enarksh.ENK_RST_ID_ERROR)
442
443
    # ------------------------------------------------------------------------------------------------------------------
444
    def fake_get_resource_by_name(self, name):
445
        """
446
        Returns a resource.
447
448
        :param str name: string The name of the requested resource.
449
450
        :rtype: mixed
451
        """
452
        for resource in self.resources:
453
            if resource.get_name() == name:
454
                return resource
455
456
        if self._parent_node:
457
            return self._parent_node.fake_get_resource_by_name(name)
458
459
        return None
460
461
    # ------------------------------------------------------------------------------------------------------------------
462
    def sync_state(self):
0 ignored issues
show
Coding Style introduced by
This method should have a docstring.

The coding style of this project requires that you add a docstring to this code element. Below, you find an example for methods:

class SomeClass:
    def some_method(self):
        """Do x and return foo."""

If you would like to know more about docstrings, we recommend to read PEP-257: Docstring Conventions.

Loading history...
463
        DataLayer.enk_back_run_node_update_status(self.rnd_id,
464
                                                  self.rst_id,
465
                                                  self._rnd_datetime_start,
466
                                                  self._rnd_datetime_stop,
467
                                                  self._exit_status)
468
469
    # ------------------------------------------------------------------------------------------------------------------
470
    @abc.abstractmethod
471
    def is_simple_node(self):
472
        """
473
        Returns True if this node is a simple node. Otherwise, returns False.
474
        """
475
        raise NotImplementedError()
476
477
    # ------------------------------------------------------------------------------------------------------------------
478
    def get_uri(self, obj_type='node'):
479
        """
480
        Returns the URI of this node.
481
482
        :param str obj_type: The entity type.
483
484
        :rtype: str
485
        """
486
        if self._parent_node:
487
            uri = self._parent_node.get_uri(obj_type)
488
        else:
489
            uri = '//' + obj_type
490
491
        return uri + '/' + self._node_name
492
493
    # ------------------------------------------------------------------------------------------------------------------
494
    @abc.abstractmethod
495
    def is_complex_node(self):
496
        """
497
        Returns True if this node is a complex node. Otherwise, returns False.
498
        """
499
        raise NotImplementedError()
500
501
# ----------------------------------------------------------------------------------------------------------------------
502