GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Passed
Push — master ( 28bb03...fbdd84 )
by P.R.
03:47
created

Node._rst_id_wrapper()   A

Complexity

Conditions 1

Size

Total Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
cc 1
c 0
b 0
f 0
dl 0
loc 8
ccs 0
cts 2
cp 0
crap 2
rs 9.4285
1
"""
2
Enarksh
3
4
Copyright 2013-2016 Set Based IT Consultancy
5
6
Licence MIT
7
"""
8
import abc
9
from time import strftime, gmtime
10
11
import enarksh
12
from enarksh.DataLayer import DataLayer
13
from enarksh.controller.StateChange import StateChange
14
15
16
class Node(StateChange, metaclass=abc.ABCMeta):
17
    """
18
    Abstract class for objects in the controller of type 'Node'.
19
    """
20
    _rst_id_weight = {enarksh.ENK_RST_ID_RUNNING:   5,
21
                      enarksh.ENK_RST_ID_QUEUED:    4,
22
                      enarksh.ENK_RST_ID_ERROR:     3,
23
                      enarksh.ENK_RST_ID_WAITING:   2,
24
                      enarksh.ENK_RST_ID_COMPLETED: 1}
25
26
    _weight_rst_id = {5: enarksh.ENK_RST_ID_RUNNING,
27
                      4: enarksh.ENK_RST_ID_QUEUED,
28
                      3: enarksh.ENK_RST_ID_ERROR,
29
                      2: enarksh.ENK_RST_ID_WAITING,
30
                      1: enarksh.ENK_RST_ID_COMPLETED}
31
32
    # ------------------------------------------------------------------------------------------------------------------
33
    def __init__(self, node_data):
34
        """
35
        Object constructor
36
37
        :param dict node_data:
38
        """
39
        StateChange.__init__(self)
40
41
        self._rnd_id = node_data['rnd_id']
42
        """
43
        The ID of this (run) node.
44
45
        :type: int
46
        """
47
48
        self._node_name = str(node_data['nod_name'], 'utf-8')  # @todo XXX DataLayer encoding issue
49
        """
50
        The name of this node.
51
52
        :type: str
53
        """
54
55
        self._rst_id = node_data['rst_id']
56
        """
57
        The ID of the run status of this node.
58
59
        :type: int
60
        """
61
62
        self._rnd_datetime_start = node_data['rnd_datetime_start']
63
        """
64
        The epoch this node has been started.
65
66
        :type:
67
        """
68
69
        self._rnd_datetime_stop = node_data['rnd_datetime_stop']
70
        """
71
        The epoch this node has finished.
72
73
        :type:
74
        """
75
76
        self._exit_status = None
77
        """
78
        The exit status of the job of this node.
79
80
        :type: None|bool
81
        """
82
83
        self.consumptions = []
84
        """
85
        The consumptions of this node.
86
87
        :type: list[enarksh.controller.consumption.Consumption.Consumption]
88
        """
89
90
        self.resources = []
91
        """
92
        The resources of this node.
93
94
        :type: list[enarksh.controller.resource.Resource.Resource]
95
        """
96
97
        self._scheduling_weight = 0
98
        """
99
        The weight of this node to be taken into account when sorting queued nodes.
100
101
        :type: int
102
        """
103
104
        self._parent_node = None
105
        """
106
        The parent node of this node.
107
108
        :type: None|enarksh.controller.node.Node.Node
109
        """
110
111
        self._child_nodes = []
112
        """
113
        The child nodes of this node. This list is empty for simple nodes.
114
115
        :type: list[enarksh.controller.node.Node.Node]
116
        """
117
118
        self._predecessor_nodes = []
119
        """
120
        The direct (simple) predecessor nodes of this node. This list is empty for complex nodes.
121
122
        :type: list[enarksh.controller.node.Node.Node]
123
        """
124
125
        self._successor_nodes = []
126
        """
127
        The direct (simple) successor nodes of this node. This list is empty for complex nodes.
128
129
        :type: list[enarksh.controller.node.Node.Node]
130
        """
131
132
    # ------------------------------------------------------------------------------------------------------------------
133
    @property
134
    def name(self):
135
        """
136
        Getter for name. Returns the name of this node.
137
138
        :rtype: str
139
        """
140
        return self._node_name
141
142
    # ------------------------------------------------------------------------------------------------------------------
143
    @property
144
    def rnd_id(self):
145
        """
146
        Getter for rnd_id. Returns the ID of this node.
147
148
        :rtype: int
149
        """
150
        return self._rnd_id
151
152
    # ------------------------------------------------------------------------------------------------------------------
153
    @property
154
    def rst_id(self):
155
        """
156
        Getter for rst_id. Returns the ID of the run status of this node.
157
158
        :rtype: int
159
        """
160
        return self._rst_id
161
162
    # ------------------------------------------------------------------------------------------------------------------
163
    @StateChange.wrapper
164
    def _rst_id_wrapper(self, rst_id):  # XXX Hack!
165
        """
166
        Setter for rst_id. Sets the the run status of this node.
167
168
        :param int rst_id: The ID of the run status.
169
        """
170
        self._rst_id = rst_id
171
172
    # ------------------------------------------------------------------------------------------------------------------
173
    @rst_id.setter
174
    def rst_id(self, rst_id):
175
        """
176
        Setter for rst_id. Sets the the run status of this node.
177
178
        :param int rst_id: The ID of the run status.
179
        """
180
        self._rst_id_wrapper(rst_id)
181
182
    # ------------------------------------------------------------------------------------------------------------------
183
    @property
184
    def schedule_wait(self):
185
        """
186
        Return the scheduling wait (i.e. the number (direct and indirect) of simple successors).
187
188
        :rtype: int
189
        """
190
        return self._scheduling_weight
191
192
    # ------------------------------------------------------------------------------------------------------------------
193
    def get_state_attributes(self):
194
        """
195
        :rtype: dict[str,int]
196
        """
197
        return {'rnd_id': self.rnd_id,
198
                'rst_id': self.rst_id}
199
200
    # ------------------------------------------------------------------------------------------------------------------
201
    def __del__(self):
202
        # print("Deleting node %s" % self.rnd_id)
203
        pass
204
205
    # ------------------------------------------------------------------------------------------------------------------
206
    def destroy(self):
0 ignored issues
show
Coding Style introduced by
This method should have a docstring.

The coding style of this project requires that you add a docstring to this code element. Below, you find an example for methods:

class SomeClass:
    def some_method(self):
        """Do x and return foo."""

If you would like to know more about docstrings, we recommend to read PEP-257: Docstring Conventions.

Loading history...
207
        StateChange.unregister_all_observers(self)
208
        self.resources = []
209
        self.consumptions = []
210
        self._child_nodes = []
211
        self._parent_node = None
212
        self._predecessor_nodes = []
213
        self._successor_nodes = []
214
215
    # ------------------------------------------------------------------------------------------------------------------
216
    def acquire_resources(self):
217
        """
218
        Acquires the resources required by this node.
219
        """
220
        for consumption in self.consumptions:
221
            consumption.acquire_resource()
222
223
    # ------------------------------------------------------------------------------------------------------------------
224
    def inquire_resources(self):
225
        """
226
        Returns true when there enough resources available to start this node. Otherwise returns false.
227
228
        :rtype: bool
229
        """
230
        ret = True
231
232
        for consumption in self.consumptions:
233
            ret = consumption.inquire_resource()
234
            if not ret:
235
                break
236
237
        return ret
238
239
    # ------------------------------------------------------------------------------------------------------------------
240
    def release_resources(self):
241
        """
242
        Releases the resources required by this node.
243
        """
244
        for consumption in self.consumptions:
245
            consumption.release_resource()
246
247
    # ------------------------------------------------------------------------------------------------------------------
248
    def _recompute_run_status(self):
0 ignored issues
show
Coding Style introduced by
This method should have a docstring.

The coding style of this project requires that you add a docstring to this code element. Below, you find an example for methods:

class SomeClass:
    def some_method(self):
        """Do x and return foo."""

If you would like to know more about docstrings, we recommend to read PEP-257: Docstring Conventions.

Loading history...
249
        if self._predecessor_nodes:
250
            count_not_completed = 0
251
            count_not_finished = 0
252
            for predecessor in self._predecessor_nodes:
253
                if predecessor.rst_id != enarksh.ENK_RST_ID_COMPLETED:
254
                    count_not_completed += 1
255
                if predecessor.rst_id != enarksh.ENK_RST_ID_COMPLETED \
256
                        and predecessor.rst_id != enarksh.ENK_RST_ID_ERROR:
257
                    count_not_finished += 1
258
259
            if count_not_completed == 0:
260
                # All predecessors have run status completed.
261
                self._renew()
262
                self._set_rst_id(enarksh.ENK_RST_ID_QUEUED)
263
264
            if count_not_finished != 0 and self.rst_id != enarksh.ENK_RST_ID_WAITING:
265
                # A predecessors is been restarted.
266
                self._renew()
267
                self._set_rst_id(enarksh.ENK_RST_ID_WAITING)
268
269
    # ------------------------------------------------------------------------------------------------------------------
270
    def _set_rst_id(self, rst_id):
271
        """
272
        Sets the run status of this node.
273
274
        :param int rst_id: The new run status for this node.
275
        """
276
        old_rst_id = self.rst_id
277
        self._rst_id = rst_id
278
279
        # Update the start datetime of this node.
280
        if rst_id == enarksh.ENK_RST_ID_RUNNING:
281
            if not self._rnd_datetime_start:
282
                self._rnd_datetime_start = strftime("%Y-%m-%d %H:%M:%S", gmtime())
283
            self._rnd_datetime_stop = None
284
285
        # Update the stop datetime of this node.
286
        if old_rst_id != rst_id and rst_id in (enarksh.ENK_RST_ID_COMPLETED, enarksh.ENK_RST_ID_ERROR):
287
            self._rnd_datetime_stop = strftime("%Y-%m-%d %H:%M:%S", gmtime())
288
289
    # ------------------------------------------------------------------------------------------------------------------
290
    @StateChange.wrapper
291
    def slot_child_node_state_change(self, node, old, new):
0 ignored issues
show
Unused Code introduced by
The argument node seems to be unused.
Loading history...
Unused Code introduced by
The argument old seems to be unused.
Loading history...
Unused Code introduced by
The argument new seems to be unused.
Loading history...
292
        """
293
        :param node:
294
        :param dict old:
295
        :param dict new:
296
        """
297
        # Compute the running status of this complex node based on the running statuses of its child nodes.
298
        weight = 0
299
        for child_node in self._child_nodes:
300
            weight = max(weight, Node._rst_id_weight[child_node.rst_id])
301
302
        # Update the run status of this node.
303
        self._set_rst_id(self._weight_rst_id[weight])
304
305
    # ------------------------------------------------------------------------------------------------------------------
306
    @StateChange.wrapper
307
    def slot_predecessor_node_state_change(self, node, old, new):
0 ignored issues
show
Unused Code introduced by
The argument node seems to be unused.
Loading history...
308
        """
309
        :param node:
310
        :param dict old:
311
        :param dict new:
312
        """
313
        if old['rst_id'] != new['rst_id']:
314
            self._recompute_run_status()
315
316
    # ------------------------------------------------------------------------------------------------------------------
317
    def _renew(self):
318
        """
319
        If required renews this node, i.e. creates a new row in ENK_RUN_NODE.
320
        """
321
        if self.rst_id in (enarksh.ENK_RST_ID_ERROR, enarksh.ENK_RST_ID_COMPLETED):
322
            self._rnd_id = DataLayer.enk_back_run_node_renew(self.rnd_id)
323
            self._rst_id = enarksh.ENK_RST_ID_WAITING
324
            self._rnd_datetime_start = None
325
            self._rnd_datetime_stop = None
326
            self._exit_status = None
327
328
    # ------------------------------------------------------------------------------------------------------------------
329
    def initialize(self,
330
                   node_data,
331
                   schedule,
0 ignored issues
show
Unused Code introduced by
The argument schedule seems to be unused.
Loading history...
332
                   resources,
333
                   resources_data,
334
                   consumptions,
335
                   consumptions_data,
336
                   run_nodes,
337
                   child_nodes,
338
                   direct_predecessors,
339
                   direct_successors,
340
                   successors):
341
        """
342
        :param dict node_data:
343
        :param dict schedule:
344
        :param dict resources:
345
        :param dict resources_data:
346
        :param dict consumptions:
347
        :param dict consumptions_data:
348
        :param dict run_nodes:
349
        :param dict child_nodes:
350
        :param dict direct_predecessors:
351
        :param dict direct_successors:
352
        :param dict successors:
353
        """
354
        # Initialize the resources of this node.
355
        if self.rnd_id in resources_data:
356
            for resource_data in resources_data[self.rnd_id]:
357
                self.resources.append(resources[resource_data['rsc_id']])
358
359
        # Initialize the consumptions of this node.
360
        if self.rnd_id in consumptions_data:
361
            for consumption_data in consumptions_data[self.rnd_id]:
362
                self.consumptions.append(consumptions[consumption_data['cns_id']])
363
364
        # Observe all direct predecessor nodes of this node (for simple nodes only) and initialize predecessor state
365
        # count.
366
        if self.rnd_id in direct_predecessors:
367
            for predecessor in direct_predecessors[self.rnd_id]:
368
                node = run_nodes[predecessor]
369
                node.register_observer(self.slot_predecessor_node_state_change)
370
                self._predecessor_nodes.append(node)
371
372
        # Observe the child run_nodes of this node (for complex nodes only).
373
        if self.rnd_id in child_nodes:
374
            for child_node in child_nodes[self.rnd_id]:
375
                node = run_nodes[child_node['rnd_id']]
376
                node.register_observer(self.slot_child_node_state_change)
377
                self._child_nodes.append(node)
378
379
        # Set the parent node of this node.
380
        if node_data['rnd_id_parent']:
381
            self._parent_node = run_nodes[node_data['rnd_id_parent']]
382
383
        #
384
        if self.rnd_id in direct_successors:
385
            for successor in direct_successors[self.rnd_id]:
386
                node = run_nodes[successor]
387
                self._successor_nodes.append(node)
388
389
        # Set scheduling weight, i.e. the number of (direct and indirect) successors.
390
        if self.rnd_id in successors:
391
            self._scheduling_weight = len(successors[self.rnd_id])
392
393
    # ------------------------------------------------------------------------------------------------------------------
394
    def get_start_message(self):
395
        """
396
        :rtype: dict[str,str|int]
397
        """
398
        message = {'type':   'start_node',
399
                   'rnd_id': self.rnd_id}
400
401
        return message
402
403
    # ------------------------------------------------------------------------------------------------------------------
404
    @abc.abstractmethod
405
    def restart(self):
406
        """
407
        Restarts this node.
408
        """
409
        raise NotImplementedError()
410
411
    # ------------------------------------------------------------------------------------------------------------------
412
    @abc.abstractmethod
413
    def restart_failed(self):
414
        """
415
        Restarts all failed simple nodes.
416
        """
417
        raise NotImplementedError()
418
419
    # ------------------------------------------------------------------------------------------------------------------
420
    @StateChange.wrapper
421
    def start(self):
422
        """
423
        :rtype: bool
424
        """
425
        # Acquire the required resources of this node.
426
        self.acquire_resources()
427
428
        # Set the status of this node to running.
429
        self._set_rst_id(enarksh.ENK_RST_ID_RUNNING)
430
431
        return True
432
433
    # ------------------------------------------------------------------------------------------------------------------
434
    @StateChange.wrapper
435
    def stop(self, exit_status):
0 ignored issues
show
Coding Style introduced by
This method should have a docstring.

The coding style of this project requires that you add a docstring to this code element. Below, you find an example for methods:

class SomeClass:
    def some_method(self):
        """Do x and return foo."""

If you would like to know more about docstrings, we recommend to read PEP-257: Docstring Conventions.

Loading history...
436
        # Release all by this node consumed resources.
437
        self.release_resources()
438
439
        # Save the exit status of the job.
440
        self._exit_status = exit_status
441
442
        # Update the run status of this node based on the exit status of the job.
443
        if exit_status == 0:
444
            self._set_rst_id(enarksh.ENK_RST_ID_COMPLETED)
445
        else:
446
            self._set_rst_id(enarksh.ENK_RST_ID_ERROR)
447
448
    # ------------------------------------------------------------------------------------------------------------------
449
    def fake_get_resource_by_name(self, name):
450
        """
451
        Returns a resource.
452
453
        :param str name: string The name of the requested resource.
454
455
        :rtype: mixed
456
        """
457
        for resource in self.resources:
458
            if resource.get_name() == name:
459
                return resource
460
461
        if self._parent_node:
462
            return self._parent_node.fake_get_resource_by_name(name)
463
464
        return None
465
466
    # ------------------------------------------------------------------------------------------------------------------
467
    def sync_state(self):
0 ignored issues
show
Coding Style introduced by
This method should have a docstring.

The coding style of this project requires that you add a docstring to this code element. Below, you find an example for methods:

class SomeClass:
    def some_method(self):
        """Do x and return foo."""

If you would like to know more about docstrings, we recommend to read PEP-257: Docstring Conventions.

Loading history...
468
        DataLayer.enk_back_run_node_update_status(self.rnd_id,
469
                                                  self.rst_id,
470
                                                  self._rnd_datetime_start,
471
                                                  self._rnd_datetime_stop,
472
                                                  self._exit_status)
473
474
    # ------------------------------------------------------------------------------------------------------------------
475
    @abc.abstractmethod
476
    def is_simple_node(self):
477
        """
478
        Returns True if this node is a simple node. Otherwise, returns False.
479
        """
480
        raise NotImplementedError()
481
482
    # ------------------------------------------------------------------------------------------------------------------
483
    def get_uri(self, obj_type='node'):
484
        """
485
        Returns the URI of this node.
486
487
        :param str obj_type: The entity type.
488
489
        :rtype: str
490
        """
491
        if self._parent_node:
492
            uri = self._parent_node.get_uri(obj_type)
493
        else:
494
            uri = '//' + obj_type
495
496
        return uri + '/' + self._node_name
497
498
    # ------------------------------------------------------------------------------------------------------------------
499
    @abc.abstractmethod
500
    def is_complex_node(self):
501
        """
502
        Returns True if this node is a complex node. Otherwise, returns False.
503
        """
504
        raise NotImplementedError()
505
506
# ----------------------------------------------------------------------------------------------------------------------
507