GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Passed
Pull Request — master (#4)
by Oleg
02:28
created

Node.rnd_id()   A

Complexity

Conditions 1

Size

Total Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
cc 1
dl 0
loc 8
ccs 0
cts 2
cp 0
crap 2
rs 9.4285
c 0
b 0
f 0
1
"""
2
Enarksh
3
4
Copyright 2013-2016 Set Based IT Consultancy
5
6
Licence MIT
7
"""
8
import abc
9
from time import strftime, gmtime
10
11
import enarksh
12
from enarksh.DataLayer import DataLayer
13
from enarksh.Controller.StateChange import StateChange
14
15
16
class Node(StateChange):
17
    """
18
    Abstract class for objects in the controller of type 'Node'.
19
    """
20
    _rst_id_weight = {enarksh.ENK_RST_ID_RUNNING: 5,
21
                      enarksh.ENK_RST_ID_QUEUED: 4,
22
                      enarksh.ENK_RST_ID_ERROR: 3,
23
                      enarksh.ENK_RST_ID_WAITING: 2,
24
                      enarksh.ENK_RST_ID_COMPLETED: 1}
25
26
    _weight_rst_id = {5: enarksh.ENK_RST_ID_RUNNING,
27
                      4: enarksh.ENK_RST_ID_QUEUED,
28
                      3: enarksh.ENK_RST_ID_ERROR,
29
                      2: enarksh.ENK_RST_ID_WAITING,
30
                      1: enarksh.ENK_RST_ID_COMPLETED}
31
32
    # ------------------------------------------------------------------------------------------------------------------
33
    def __init__(self, node_data):
34
        """
35
        Object constructor
36
37
        :param dict node_data:
38
        """
39
        StateChange.__init__(self)
40
41
        self._rnd_id = node_data['rnd_id']
42
        """
43
        The ID of this (run) node.
44
45
        :type:
46
        """
47
48
        self._node_name = str(node_data['nod_name'], 'utf-8')  # @todo XXX DataLayer encoding issue
49
        """
50
        The name of this node.
51
52
        :type: str
53
        """
54
55
        self._rst_id = node_data['rst_id']
56
        """
57
        The ID of the run status of this node.
58
59
        :type:
60
        """
61
62
        self._rnd_datetime_start = node_data['rnd_datetime_start']
63
        """
64
        The epoch this node has been started.
65
66
        :type:
67
        """
68
69
        self._rnd_datetime_stop = node_data['rnd_datetime_stop']
70
        """
71
        The epoch this node has finished.
72
73
        :type:
74
        """
75
76
        self._exit_status = None
77
        """
78
        The exit status of the job of this node.
79
80
        :type: bool
81
        """
82
83
        self.consumptions = []
84
        """
85
        The consumptions of this node.
86
87
        :type: list
88
        """
89
90
        self.resources = []
91
        """
92
        The resources of this node.
93
94
        :type: list
95
        """
96
97
        self._scheduling_weight = 0
98
        """
99
        The weight of this node to be taken into account when sorting queued nodes.
100
101
        :type: int
102
        """
103
104
        self._parent_node = None
105
        """
106
        The parent node of this node.
107
108
        :type: None|enarksh.Controller.Node.Node.Node
109
        """
110
111
        self._child_nodes = []
112
        """
113
        The child nodes of this node. This list is empty for simple nodes.
114
115
        :type: list
116
        """
117
118
        self._predecessor_nodes = []
119
        """
120
        The direct (simple) predecessor nodes of this node. This list is empty for complex nodes.
121
122
        :type: list
123
        """
124
125
        self._successor_nodes = []
126
        """
127
        The direct (simple) successor nodes of this node. This list is empty for complex nodes.
128
129
        :type: list
130
        """
131
132
    # ------------------------------------------------------------------------------------------------------------------
133
    @property
134
    def name(self):
135
        """
136
        Getter for name. Returns the name of this node.
137
138
        :rtype: str
139
        """
140
        return self._node_name
141
142
    # ------------------------------------------------------------------------------------------------------------------
143
    @property
144
    def rnd_id(self):
145
        """
146
        Getter for rnd_id. Returns the ID of this node.
147
148
        :rtype: int
149
        """
150
        return self._rnd_id
151
152
    # ------------------------------------------------------------------------------------------------------------------
153
    @property
154
    def rst_id(self):
155
        """
156
        Getter for rst_id. Returns the ID of the run status of this node.
157
158
        :rtype: int
159
        """
160
        return self._rst_id
161
162
    # ------------------------------------------------------------------------------------------------------------------
163
    @StateChange.wrapper
164
    @rst_id.setter
165
    def rst_id(self, rst_id):
166
        """
167
        Setter for rst_id. Sets the the run status of this node.
168
169
        :param int rst_id: The ID of the run status.
170
        """
171
        self.rst_id = rst_id
172
173
    # ------------------------------------------------------------------------------------------------------------------
174
    @property
175
    def schedule_wait(self):
176
        """
177
        Return the scheduling wait (i.e. the number (direct and indirect) of simple successors).
178
179
        :rtype: int
180
        """
181
        return self._scheduling_weight
182
183
    # ------------------------------------------------------------------------------------------------------------------
184
    def get_state_attributes(self):
185
        """
186
        :rtype: dict[str,int]
187
        """
188
        return {'rnd_id': self.rnd_id,
189
                'rst_id': self.rst_id}
190
191
    # ------------------------------------------------------------------------------------------------------------------
192
    def __del__(self):
193
        # print("Deleting node %s" % self.rnd_id)
194
        pass
195
196
    # ------------------------------------------------------------------------------------------------------------------
197
    def destroy(self):
0 ignored issues
show
Coding Style introduced by
This method should have a docstring.

The coding style of this project requires that you add a docstring to this code element. Below, you find an example for methods:

class SomeClass:
    def some_method(self):
        """Do x and return foo."""

If you would like to know more about docstrings, we recommend to read PEP-257: Docstring Conventions.

Loading history...
198
        StateChange.unregister_all_observers(self)
199
        self.resources = []
200
        self.consumptions = []
201
        self._child_nodes = []
202
        self._parent_node = None
203
        self._predecessor_nodes = []
204
        self._successor_nodes = []
205
206
    # ------------------------------------------------------------------------------------------------------------------
207
    def acquire_resources(self):
208
        """
209
        Acquires the resources required by this node.
210
        """
211
        for consumption in self.consumptions:
212
            consumption.acquire_resource()
213
214
    # ------------------------------------------------------------------------------------------------------------------
215
    def inquire_resources(self):
216
        """
217
        Returns true when there enough resources available to start this node. Otherwise returns false.
218
219
        :rtype: bool
220
        """
221
        ret = True
222
223
        for consumption in self.consumptions:
224
            ret = consumption.inquire_resource()
225
            if not ret:
226
                break
227
228
        return ret
229
230
    # ------------------------------------------------------------------------------------------------------------------
231
    def release_resources(self):
232
        """
233
        Releases the resources required by this node.
234
        """
235
        for consumption in self.consumptions:
236
            consumption.release_resource()
237
238
    # ------------------------------------------------------------------------------------------------------------------
239
    def _recompute_run_status(self):
0 ignored issues
show
Coding Style introduced by
This method should have a docstring.

The coding style of this project requires that you add a docstring to this code element. Below, you find an example for methods:

class SomeClass:
    def some_method(self):
        """Do x and return foo."""

If you would like to know more about docstrings, we recommend to read PEP-257: Docstring Conventions.

Loading history...
240
        if self._predecessor_nodes:
241
            count_not_completed = 0
242
            count_not_finished = 0
243
            for predecessor in self._predecessor_nodes:
244
                if predecessor.rst_id != enarksh.ENK_RST_ID_COMPLETED:
245
                    count_not_completed += 1
246
                if predecessor.rst_id != enarksh.ENK_RST_ID_COMPLETED and \
247
                                predecessor.rst_id != enarksh.ENK_RST_ID_ERROR:
248
                    count_not_finished += 1
249
250
            if count_not_completed == 0:
251
                # All predecessors have run status completed.
252
                self._renew()
253
                self._set_rst_id(enarksh.ENK_RST_ID_QUEUED)
254
255
            if count_not_finished != 0 and self.rst_id != enarksh.ENK_RST_ID_WAITING:
256
                # A predecessors is been restarted.
257
                self._renew()
258
                self._set_rst_id(enarksh.ENK_RST_ID_WAITING)
259
260
    # ------------------------------------------------------------------------------------------------------------------
261
    def _set_rst_id(self, rst_id):
262
        """
263
        Sets the run status of this node.
264
265
        :param int rst_id: The new run status for this node.
266
        """
267
        old_rst_id = self.rst_id
268
        self._rst_id = rst_id
269
270
        # Update the start datetime of this node.
271
        if rst_id == enarksh.ENK_RST_ID_RUNNING:
272
            if not self._rnd_datetime_start:
273
                self._rnd_datetime_start = strftime("%Y-%m-%d %H:%M:%S", gmtime())
274
            self._rnd_datetime_stop = None
275
276
        # Update the stop datetime of this node.
277
        if old_rst_id != rst_id and rst_id in (enarksh.ENK_RST_ID_COMPLETED, enarksh.ENK_RST_ID_ERROR):
278
            self._rnd_datetime_stop = strftime("%Y-%m-%d %H:%M:%S", gmtime())
279
280
    # ------------------------------------------------------------------------------------------------------------------
281
    @StateChange.wrapper
282
    def slot_child_node_state_change(self, node, old, new):
0 ignored issues
show
Unused Code introduced by
The argument new seems to be unused.
Loading history...
Unused Code introduced by
The argument old seems to be unused.
Loading history...
Unused Code introduced by
The argument node seems to be unused.
Loading history...
283
        """
284
        :param node:
285
        :param dict old:
286
        :param dict new:
287
        """
288
        # Compute the running status of this complex node based on the running statuses of its child nodes.
289
        weight = 0
290
        for child_node in self._child_nodes:
291
            weight = max(weight, Node._rst_id_weight[child_node.rst_id])
292
293
        # Update the run status of this node.
294
        self._set_rst_id(self._weight_rst_id[weight])
295
296
    # ------------------------------------------------------------------------------------------------------------------
297
    @StateChange.wrapper
298
    def slot_predecessor_node_state_change(self, node, old, new):
0 ignored issues
show
Unused Code introduced by
The argument node seems to be unused.
Loading history...
299
        """
300
        :param node:
301
        :param dict old:
302
        :param dict new:
303
        """
304
        if old['rst_id'] != new['rst_id']:
305
            self._recompute_run_status()
306
307
    # ------------------------------------------------------------------------------------------------------------------
308
    def _renew(self):
309
        """
310
        If required renews this node, i.e. creates a new row in ENK_RUN_NODE.
311
        """
312
        if self.rst_id in (enarksh.ENK_RST_ID_ERROR, enarksh.ENK_RST_ID_COMPLETED):
313
            self._rnd_id = DataLayer.enk_back_run_node_renew(self.rnd_id)
314
            self._rst_id = enarksh.ENK_RST_ID_WAITING
315
            self._rnd_datetime_start = None
316
            self._rnd_datetime_stop = None
317
            self._exit_status = None
318
319
    # ------------------------------------------------------------------------------------------------------------------
320
    def initialize(self,
321
                   node_data,
322
                   schedule,
0 ignored issues
show
Unused Code introduced by
The argument schedule seems to be unused.
Loading history...
323
                   resources,
324
                   resources_data,
325
                   consumptions,
326
                   consumptions_data,
327
                   run_nodes,
328
                   child_nodes,
329
                   direct_predecessors,
330
                   direct_successors,
331
                   successors):
332
        """
333
        :param dict node_data:
334
        :param dict schedule:
335
        :param dict resources:
336
        :param dict resources_data:
337
        :param dict consumptions:
338
        :param dict consumptions_data:
339
        :param dict run_nodes:
340
        :param dict child_nodes:
341
        :param dict direct_predecessors:
342
        :param dict direct_successors:
343
        :param dict successors:
344
        """
345
        # Initialize the resources of this node.
346
        if self.rnd_id in resources_data:
347
            for resource_data in resources_data[self.rnd_id]:
348
                self.resources.append(resources[resource_data['rsc_id']])
349
350
        # Initialize the consumptions of this node.
351
        if self.rnd_id in consumptions_data:
352
            for consumption_data in consumptions_data[self.rnd_id]:
353
                self.consumptions.append(consumptions[consumption_data['cns_id']])
354
355
        # Observe all direct predecessor nodes of this node (for simple nodes only) and initialize predecessor state
356
        # count.
357
        if self.rnd_id in direct_predecessors:
358
            for predecessor in direct_predecessors[self.rnd_id]:
359
                node = run_nodes[predecessor]
360
                node.register_observer(self.slot_predecessor_node_state_change)
361
                self._predecessor_nodes.append(node)
362
363
        # Observe the child run_nodes of this node (for complex nodes only).
364
        if self.rnd_id in child_nodes:
365
            for child_node in child_nodes[self.rnd_id]:
366
                node = run_nodes[child_node['rnd_id']]
367
                node.register_observer(self.slot_child_node_state_change)
368
                self._child_nodes.append(node)
369
370
        # Set the parent node of this node.
371
        if node_data['rnd_id_parent']:
372
            self._parent_node = run_nodes[node_data['rnd_id_parent']]
373
374
        #
375
        if self.rnd_id in direct_successors:
376
            for successor in direct_successors[self.rnd_id]:
377
                node = run_nodes[successor]
378
                self._successor_nodes.append(node)
379
380
        # Set scheduling weight, i.e. the number of (direct and indirect) successors.
381
        if self.rnd_id in successors:
382
            self._scheduling_weight = len(successors[self.rnd_id])
383
384
    # ------------------------------------------------------------------------------------------------------------------
385
    def get_start_message(self):
386
        """
387
        :rtype: dict[str,str|int]
388
        """
389
        message = {'type': 'start_node',
390
                   'rnd_id': self.rnd_id}
391
392
        return message
393
394
    # ------------------------------------------------------------------------------------------------------------------
395
    @abc.abstractmethod
396
    def restart(self):
397
        """
398
        Restarts this node.
399
        """
400
        pass
401
402
    # ------------------------------------------------------------------------------------------------------------------
403
    @abc.abstractmethod
404
    def restart_failed(self):
405
        """
406
        Restarts all failed simple nodes.
407
        """
408
        pass
409
410
    # ------------------------------------------------------------------------------------------------------------------
411
    @StateChange.wrapper
412
    def start(self):
413
        """
414
        :rtype: bool
415
        """
416
        # Acquire the required resources of this node.
417
        self.acquire_resources()
418
419
        # Set the status of this node to running.
420
        self._set_rst_id(enarksh.ENK_RST_ID_RUNNING)
421
422
        return True
423
424
    # ------------------------------------------------------------------------------------------------------------------
425
    @StateChange.wrapper
426
    def stop(self, exit_status):
0 ignored issues
show
Coding Style introduced by
This method should have a docstring.

The coding style of this project requires that you add a docstring to this code element. Below, you find an example for methods:

class SomeClass:
    def some_method(self):
        """Do x and return foo."""

If you would like to know more about docstrings, we recommend to read PEP-257: Docstring Conventions.

Loading history...
427
        # Release all by this node consumed resources.
428
        self.release_resources()
429
430
        # Save the exit status of the job.
431
        self._exit_status = exit_status
432
433
        # Update the run status of this node based on the exit status of the job.
434
        if exit_status == 0:
435
            self._set_rst_id(enarksh.ENK_RST_ID_COMPLETED)
436
        else:
437
            self._set_rst_id(enarksh.ENK_RST_ID_ERROR)
438
439
    # ------------------------------------------------------------------------------------------------------------------
440
    def fake_get_resource_by_name(self, name):
441
        """
442
        Returns a resource.
443
444
        :param str name: string The name of the requested resource.
445
446
        :rtype: mixed
447
        """
448
        for resource in self.resources:
449
            if resource.get_name() == name:
450
                return resource
451
452
        if self._parent_node:
453
            return self._parent_node.fake_get_resource_by_name(name)
454
455
        return None
456
457
    # ------------------------------------------------------------------------------------------------------------------
458
    def sync_state(self):
0 ignored issues
show
Coding Style introduced by
This method should have a docstring.

The coding style of this project requires that you add a docstring to this code element. Below, you find an example for methods:

class SomeClass:
    def some_method(self):
        """Do x and return foo."""

If you would like to know more about docstrings, we recommend to read PEP-257: Docstring Conventions.

Loading history...
459
        DataLayer.enk_back_run_node_update_status(self.rnd_id,
460
                                                  self.rst_id,
461
                                                  self._rnd_datetime_start,
462
                                                  self._rnd_datetime_stop,
463
                                                  self._exit_status)
464
465
    # ------------------------------------------------------------------------------------------------------------------
466
    @abc.abstractmethod
467
    def is_simple_node(self):
468
        """
469
        Returns True if this node is a simple node. Otherwise, returns False.
470
        """
471
        pass
472
473
    # ------------------------------------------------------------------------------------------------------------------
474
    def get_uri(self, obj_type='node'):
475
        """
476
        Returns the URI of this node.
477
478
        :param str obj_type: The entity type.
479
480
        :rtype: str
481
        """
482
        if self._parent_node:
483
            uri = self._parent_node.get_uri(obj_type)
484
        else:
485
            uri = '//' + obj_type
486
487
        return uri + '/' + self._node_name
488
489
    # ------------------------------------------------------------------------------------------------------------------
490
    @abc.abstractmethod
491
    def is_complex_node(self):
492
        """
493
        Returns True if this node is a complex node. Otherwise, returns False.
494
        """
495
        pass
496
497
498
# ----------------------------------------------------------------------------------------------------------------------
499