Passed
Push — master ( dbdfc9...f5fa38 )
by P.R.
03:36
created

Node.rst_id()   B

Complexity

Conditions 5

Size

Total Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 30

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 5
c 2
b 0
f 0
dl 0
loc 8
ccs 0
cts 2
cp 0
crap 30
rs 8.5454
1
"""
2
Enarksh
3
4
Copyright 2013-2016 Set Based IT Consultancy
5
6
Licence MIT
7
"""
8
import abc
9
from time import strftime, gmtime
10
11
import enarksh
12
from enarksh.DataLayer import DataLayer
13
from enarksh.controller.StateChange import StateChange
14
15
16
class Node(StateChange, metaclass=abc.ABCMeta):
17
    """
18
    Abstract class for objects in the controller of type 'Node'.
19
    """
20
    _rst_id_weight = {enarksh.ENK_RST_ID_RUNNING:   5,
21
                      enarksh.ENK_RST_ID_QUEUED:    4,
22
                      enarksh.ENK_RST_ID_ERROR:     3,
23
                      enarksh.ENK_RST_ID_WAITING:   2,
24
                      enarksh.ENK_RST_ID_COMPLETED: 1}
25
26
    _weight_rst_id = {5: enarksh.ENK_RST_ID_RUNNING,
27
                      4: enarksh.ENK_RST_ID_QUEUED,
28
                      3: enarksh.ENK_RST_ID_ERROR,
29
                      2: enarksh.ENK_RST_ID_WAITING,
30
                      1: enarksh.ENK_RST_ID_COMPLETED}
31
32
    event_new_node_creation = None
33
    """
34
    The event that wil be fired when a new node has been created.
35
36
    :type: enarksh.event.Event.Event
37
    """
38
39
    # ------------------------------------------------------------------------------------------------------------------
40
    def __init__(self, node_data):
41
        """
42
        Object constructor
43
44
        :param dict node_data:
45
        """
46
        StateChange.__init__(self)
47
48
        self._rnd_id = node_data['rnd_id']
49
        """
50
        The ID of this (run) node.
51
52
        :type: int
53
        """
54
55
        self._node_name = str(node_data['nod_name'], 'utf-8')  # @todo XXX DataLayer encoding issue
56
        """
57
        The name of this node.
58
59
        :type: str
60
        """
61
62
        self._rst_id = node_data['rst_id']
63
        """
64
        The ID of the run status of this node.
65
66
        :type: int
67
        """
68
69
        self._rnd_datetime_start = node_data['rnd_datetime_start']
70
        """
71
        The epoch this node has been started.
72
73
        :type:
74
        """
75
76
        self._rnd_datetime_stop = node_data['rnd_datetime_stop']
77
        """
78
        The epoch this node has finished.
79
80
        :type:
81
        """
82
83
        self._exit_status = None
84
        """
85
        The exit status of the job of this node.
86
87
        :type: None|bool
88
        """
89
90
        self.consumptions = []
91
        """
92
        The consumptions of this node.
93
94
        :type: list[enarksh.controller.consumption.Consumption.Consumption]
95
        """
96
97
        self.resources = []
98
        """
99
        The resources of this node.
100
101
        :type: list[enarksh.controller.resource.Resource.Resource]
102
        """
103
104
        self._scheduling_weight = 0
105
        """
106
        The weight of this node to be taken into account when sorting queued nodes.
107
108
        :type: int
109
        """
110
111
        self._parent_node = None
112
        """
113
        The parent node of this node.
114
115
        :type: enarksh.controller.node.Node.Node|None
116
        """
117
118
        self._child_nodes = []
119
        """
120
        The child nodes of this node. This list is empty for simple nodes.
121
122
        :type: list[enarksh.controller.node.Node.Node]
123
        """
124
125
        self._predecessor_nodes = []
126
        """
127
        The direct (simple) predecessor nodes of this node. This list is empty for complex nodes.
128
129
        :type: list[enarksh.controller.node.Node.Node]
130
        """
131
132
        self._successor_nodes = []
133
        """
134
        The direct (simple) successor nodes of this node. This list is empty for complex nodes.
135
136
        :type: list[enarksh.controller.node.Node.Node]
137
        """
138
139
        Node.event_new_node_creation.fire(self)
140
141
    # ------------------------------------------------------------------------------------------------------------------
142
    @property
143
    def name(self):
144
        """
145
        Getter for name. Returns the name of this node.
146
147
        :rtype: str
148
        """
149
        return self._node_name
150
151
    # ------------------------------------------------------------------------------------------------------------------
152
    @property
153
    def rnd_id(self):
154
        """
155
        Getter for rnd_id. Returns the ID of this node.
156
157
        :rtype: int
158
        """
159
        return self._rnd_id
160
161
    # ------------------------------------------------------------------------------------------------------------------
162
    @property
163
    def rst_id(self):
164
        """
165
        Getter for rst_id. Returns the ID of the run status of this node.
166
167
        :rtype: int
168
        """
169
        return self._rst_id
170
171
    # ------------------------------------------------------------------------------------------------------------------
172
    @StateChange.wrapper
173
    def __rst_id_wrapper(self, rst_id):
174
        """
175
        Setter for rst_id. Sets the the run status of this node.
176
177
        :param int rst_id: The ID of the run status.
178
        """
179
        self._rst_id = rst_id
180
181
    # ------------------------------------------------------------------------------------------------------------------
182
    @rst_id.setter
183
    def rst_id(self, rst_id):
184
        """
185
        Sets the run status of this node.
186
187
        :param int rst_id: The new run status for this node.
188
        """
189
        old_rst_id = self.rst_id
190
        self.__rst_id_wrapper(rst_id)
191
192
        # Update the start datetime of this node.
193
        if rst_id == enarksh.ENK_RST_ID_RUNNING:
194
            if not self._rnd_datetime_start:
195
                self._rnd_datetime_start = strftime("%Y-%m-%d %H:%M:%S", gmtime())
196
            self._rnd_datetime_stop = None
197
198
        # Update the stop datetime of this node.
199
        if old_rst_id != rst_id and rst_id in (enarksh.ENK_RST_ID_COMPLETED, enarksh.ENK_RST_ID_ERROR):
200
            self._rnd_datetime_stop = strftime("%Y-%m-%d %H:%M:%S", gmtime())
201
202
    # ------------------------------------------------------------------------------------------------------------------
203
    @property
204
    def scheduling_weight(self):
205
        """
206
        Return the scheduling weight (i.e. the number (direct and indirect) of simple successors).
207
208
        :rtype: int
209
        """
210
        return self._scheduling_weight
211
212
    # ------------------------------------------------------------------------------------------------------------------
213
    def get_state_attributes(self):
214
        """
215
        :rtype: dict[str,int]
216
        """
217
        return {'rnd_id': self.rnd_id,
218
                'rst_id': self.rst_id}
219
220
    # ------------------------------------------------------------------------------------------------------------------
221
    def __del__(self):
222
        """
223
        Object destructor.
224
        """
225
        # print("Deleting node %s" % self.rnd_id)
226
        pass
227
228
    # ------------------------------------------------------------------------------------------------------------------
229
    def acquire_resources(self):
230
        """
231
        Acquires the resources required by this node.
232
        """
233
        for consumption in self.consumptions:
234
            consumption.acquire_resource()
235
236
    # ------------------------------------------------------------------------------------------------------------------
237
    def inquire_resources(self):
238
        """
239
        Returns true when there enough resources available to start this node. Otherwise returns false.
240
241
        :rtype: bool
242
        """
243
        ret = True
244
245
        for consumption in self.consumptions:
246
            ret = consumption.inquire_resource()
247
            if not ret:
248
                break
249
250
        return ret
251
252
    # ------------------------------------------------------------------------------------------------------------------
253
    def release_resources(self):
254
        """
255
        Releases the resources required by this node.
256
        """
257
        for consumption in self.consumptions:
258
            consumption.release_resource()
259
260
    # ------------------------------------------------------------------------------------------------------------------
261
    def _recompute_run_status(self):
262
        """
263
        Recomputes the run status of that node based on the run statuses of the predecessor nodes of this node.
264
        """
265
        if self._predecessor_nodes:
266
            count_not_completed = 0
267
            count_not_finished = 0
268
            for predecessor in self._predecessor_nodes:
269
                if predecessor.rst_id != enarksh.ENK_RST_ID_COMPLETED:
270
                    count_not_completed += 1
271
                if predecessor.rst_id != enarksh.ENK_RST_ID_COMPLETED \
272
                        and predecessor.rst_id != enarksh.ENK_RST_ID_ERROR:
273
                    count_not_finished += 1
274
275
            if count_not_completed == 0:
276
                # All predecessors have run status completed.
277
                self._renew()
278
                self.rst_id = enarksh.ENK_RST_ID_QUEUED
279
280
            if count_not_finished != 0 and self.rst_id != enarksh.ENK_RST_ID_WAITING:
281
                # A predecessors is been restarted.
282
                self._renew()
283
                self.rst_id = enarksh.ENK_RST_ID_WAITING
284
285
    # ------------------------------------------------------------------------------------------------------------------
286
    @StateChange.wrapper
287
    def child_node_event_state_change_handler(self, _event, _event_data, _listener_data):
288
        """
289
        Event handler for a sate change of a child node.
290
291
        :param * _event: Not used.
292
        :param * _event_data: The old and new state.
293
        :param * _listener_data: Not used
294
        """
295
        del _event, _event_data, _listener_data
296
297
        # Compute the running status of this complex node based on the running statuses of its child nodes.
298
        weight = 0
299
        for child_node in self._child_nodes:
300
            weight = max(weight, Node._rst_id_weight[child_node.rst_id])
301
302
        # Update the run status of this node.
303
        self.rst_id = self._weight_rst_id[weight]
304
305
    # ------------------------------------------------------------------------------------------------------------------
306
    @StateChange.wrapper
307
    def predecessor_node_event_state_change_handler(self, _event, event_data, _listener_data):
308
        """
309
        Event handler for a sate change of a predecessor node.
310
311
        :param * _event: Not used.
312
        :param tuple[dict,dict] event_data: The old and new state.
313
        :param * _listener_data: Not used
314
        """
315
        del _event, _listener_data
316
317
        old, new = event_data
318
319
        if old['rst_id'] != new['rst_id']:
320
            self._recompute_run_status()
321
322
    # ------------------------------------------------------------------------------------------------------------------
323
    def _renew(self):
324
        """
325
        If required renews this node, i.e. creates a new row in ENK_RUN_NODE.
326
        """
327
        if self.rst_id in (enarksh.ENK_RST_ID_ERROR, enarksh.ENK_RST_ID_COMPLETED):
328
            self._rnd_id = DataLayer.enk_back_run_node_renew(self.rnd_id)
329
            self._rst_id = enarksh.ENK_RST_ID_WAITING
330
            self._rnd_datetime_start = None
331
            self._rnd_datetime_stop = None
332
            self._exit_status = None
333
334
    # ------------------------------------------------------------------------------------------------------------------
335
    def initialize(self,
336
                   node_data,
337
                   schedule,
0 ignored issues
show
Unused Code introduced by
The argument schedule seems to be unused.
Loading history...
338
                   resources,
339
                   resources_data,
340
                   consumptions,
341
                   consumptions_data,
342
                   run_nodes,
343
                   child_nodes,
344
                   direct_predecessors,
345
                   direct_successors,
346
                   successors):
347
        """
348
        Initializes this node.
349
350
        :param dict node_data:
351
        :param dict schedule:
352
        :param dict resources:
353
        :param dict resources_data:
354
        :param dict consumptions:
355
        :param dict consumptions_data:
356
        :param dict run_nodes:
357
        :param dict child_nodes:
358
        :param dict direct_predecessors:
359
        :param dict direct_successors:
360
        :param dict successors:
361
        """
362
        # Initialize the resources of this node.
363
        if self.rnd_id in resources_data:
364
            for resource_data in resources_data[self.rnd_id]:
365
                self.resources.append(resources[resource_data['rsc_id']])
366
367
        # Initialize the consumptions of this node.
368
        if self.rnd_id in consumptions_data:
369
            for consumption_data in consumptions_data[self.rnd_id]:
370
                self.consumptions.append(consumptions[consumption_data['cns_id']])
371
372
        # Observe all direct predecessor nodes of this node (for simple nodes only) and initialize predecessor state
373
        # count.
374
        if self.rnd_id in direct_predecessors:
375
            for predecessor in direct_predecessors[self.rnd_id]:
376
                node = run_nodes[predecessor]
377
                node.event_state_change.register_listener(self.predecessor_node_event_state_change_handler)
378
                self._predecessor_nodes.append(node)
379
380
        # Observe the child run_nodes of this node (for complex nodes only).
381
        if self.rnd_id in child_nodes:
382
            for child_node in child_nodes[self.rnd_id]:
383
                node = run_nodes[child_node['rnd_id']]
384
                node.event_state_change.register_listener(self.child_node_event_state_change_handler)
385
                self._child_nodes.append(node)
386
387
        # Set the parent node of this node.
388
        if node_data['rnd_id_parent']:
389
            self._parent_node = run_nodes[node_data['rnd_id_parent']]
390
391
        #
392
        if self.rnd_id in direct_successors:
393
            for successor in direct_successors[self.rnd_id]:
394
                node = run_nodes[successor]
395
                self._successor_nodes.append(node)
396
397
        # Set scheduling weight, i.e. the number of (direct and indirect) successors.
398
        if self.rnd_id in successors:
399
            self._scheduling_weight = len(successors[self.rnd_id])
400
401
    # ------------------------------------------------------------------------------------------------------------------
402
    def get_start_message(self, sch_id):
403
        """
404
        Returns the message to be send to the spawner for starting this node. Raises an exception if this node can not
405
        be started by the spawner (e.g. a complex node).
406
407
        :param int sch_id: The ID of the schedule.
408
409
        :rtype: enarksh.message.Message.Message
410
        """
411
        raise RuntimeError("Node of class '{}' can not be started by the spawner".format(self.__class__))
412
413
    # ------------------------------------------------------------------------------------------------------------------
414
    def start(self):
415
        """
416
        Does the housekeeping for starting this node. Returns True if an actual job must be started by the spawner.
417
        Returns False otherwise.
418
419
        :rtype: bool
420
        """
421
        raise RuntimeError("Node of class '{}' can not be marked started".format(self.__class__))
422
423
    # ------------------------------------------------------------------------------------------------------------------
424
    def stop(self, exit_status):
425
        """
426
        Does the housekeeping when the node has stopped.
427
428
        :param int exit_status: The exits status of the job.
429
        """
430
        raise RuntimeError("Node of class '{}' can not be marked stopped".format(self.__class__))
431
432
    # ------------------------------------------------------------------------------------------------------------------
433
    @abc.abstractmethod
434
    def restart(self):
435
        """
436
        Restarts this node.
437
        """
438
        raise NotImplementedError()
439
440
    # ------------------------------------------------------------------------------------------------------------------
441
    @abc.abstractmethod
442
    def restart_failed(self):
443
        """
444
        Restarts all failed simple nodes.
445
        """
446
        raise NotImplementedError()
447
448
    # ------------------------------------------------------------------------------------------------------------------
449
    def fake_get_resource_by_name(self, name):
450
        """
451
        Returns a resource.
452
453
        :param str name: The name of the requested resource.
454
455
        :rtype: enarksh.controller.resource.Resource.Resource|None
456
        """
457
        for resource in self.resources:
458
            if resource.get_name() == name:
459
                return resource
460
461
        if self._parent_node:
462
            return self._parent_node.fake_get_resource_by_name(name)
463
464
        return None
465
466
    # ------------------------------------------------------------------------------------------------------------------
467
    def sync_state(self):
468
        """
469
        Updates the state of this node into the database.
470
        """
471
        DataLayer.enk_back_run_node_update_status(self.rnd_id,
472
                                                  self.rst_id,
473
                                                  self._rnd_datetime_start,
474
                                                  self._rnd_datetime_stop,
475
                                                  self._exit_status)
476
477
    # ------------------------------------------------------------------------------------------------------------------
478
    def get_uri(self, obj_type='node'):
479
        """
480
        Returns the URI of this node.
481
482
        :param str obj_type: The entity type.
483
484
        :rtype: str
485
        """
486
        if self._parent_node:
487
            uri = self._parent_node.get_uri(obj_type)
488
        else:
489
            uri = '//' + obj_type
490
491
        return uri + '/' + self._node_name
492
493
    # ------------------------------------------------------------------------------------------------------------------
494
    @abc.abstractmethod
495
    def is_simple_node(self):
496
        """
497
        Returns True if this node is a simple node. Otherwise, returns False.
498
499
        :rtype: bool
500
        """
501
        raise NotImplementedError()
502
503
    # ------------------------------------------------------------------------------------------------------------------
504
    @abc.abstractmethod
505
    def is_complex_node(self):
506
        """
507
        Returns True if this node is a complex node. Otherwise, returns False.
508
509
        :rtype: bool
510
        """
511
        raise NotImplementedError()
512
513
# ----------------------------------------------------------------------------------------------------------------------
514