1 | """ |
||
2 | Enarksh |
||
3 | |||
4 | Copyright 2013-2016 Set Based IT Consultancy |
||
5 | |||
6 | Licence MIT |
||
7 | """ |
||
8 | import abc |
||
9 | import logging |
||
10 | from time import strftime, localtime |
||
11 | |||
12 | from enarksh.C import C |
||
13 | from enarksh.DataLayer import DataLayer |
||
14 | from enarksh.controller.StateChange import StateChange |
||
15 | |||
16 | |||
17 | class Node(StateChange, metaclass=abc.ABCMeta): |
||
18 | """ |
||
19 | Abstract class for objects in the controller of type 'Node'. |
||
20 | """ |
||
21 | _rst_id_weight = {C.ENK_RST_ID_RUNNING: 5, |
||
22 | C.ENK_RST_ID_QUEUED: 4, |
||
23 | C.ENK_RST_ID_ERROR: 3, |
||
24 | C.ENK_RST_ID_WAITING: 2, |
||
25 | C.ENK_RST_ID_COMPLETED: 1} |
||
26 | |||
27 | _weight_rst_id = {5: C.ENK_RST_ID_RUNNING, |
||
28 | 4: C.ENK_RST_ID_QUEUED, |
||
29 | 3: C.ENK_RST_ID_ERROR, |
||
30 | 2: C.ENK_RST_ID_WAITING, |
||
31 | 1: C.ENK_RST_ID_COMPLETED} |
||
32 | |||
33 | event_new_node_creation = None |
||
34 | """ |
||
35 | The event that wil be fired when a new node has been created. |
||
36 | |||
37 | :type: enarksh.event.Event.Event |
||
38 | """ |
||
39 | |||
40 | # ------------------------------------------------------------------------------------------------------------------ |
||
41 | def __init__(self, node_data): |
||
42 | """ |
||
43 | Object constructor |
||
44 | |||
45 | :param dict node_data: |
||
46 | """ |
||
47 | StateChange.__init__(self) |
||
48 | |||
49 | self._rnd_id = node_data['rnd_id'] |
||
50 | """ |
||
51 | The ID of this (run) node. |
||
52 | |||
53 | :type: int |
||
54 | """ |
||
55 | |||
56 | self._node_name = str(node_data['nod_name'], 'utf-8') # @todo XXX DataLayer encoding issue |
||
57 | """ |
||
58 | The name of this node. |
||
59 | |||
60 | :type: str |
||
61 | """ |
||
62 | |||
63 | self._rst_id = node_data['rst_id'] |
||
64 | """ |
||
65 | The ID of the run status of this node. |
||
66 | |||
67 | :type: int |
||
68 | """ |
||
69 | |||
70 | self._rnd_datetime_start = node_data['rnd_datetime_start'] |
||
71 | """ |
||
72 | The epoch this node has been started. |
||
73 | |||
74 | :type: |
||
75 | """ |
||
76 | |||
77 | self._rnd_datetime_stop = node_data['rnd_datetime_stop'] |
||
78 | """ |
||
79 | The epoch this node has finished. |
||
80 | |||
81 | :type: |
||
82 | """ |
||
83 | |||
84 | self._exit_status = None |
||
85 | """ |
||
86 | The exit status of the job of this node. |
||
87 | |||
88 | :type: None|bool |
||
89 | """ |
||
90 | |||
91 | self.consumptions = [] |
||
92 | """ |
||
93 | The consumptions of this node. |
||
94 | |||
95 | :type: list[enarksh.controller.consumption.Consumption.Consumption] |
||
96 | """ |
||
97 | |||
98 | self.resources = [] |
||
99 | """ |
||
100 | The resources of this node. |
||
101 | |||
102 | :type: list[enarksh.controller.resource.Resource.Resource] |
||
103 | """ |
||
104 | |||
105 | self._scheduling_weight = 0 |
||
106 | """ |
||
107 | The weight of this node to be taken into account when sorting queued nodes. |
||
108 | |||
109 | :type: int |
||
110 | """ |
||
111 | |||
112 | self._parent_node = None |
||
113 | """ |
||
114 | The parent node of this node. |
||
115 | |||
116 | :type: enarksh.controller.node.Node.Node|None |
||
117 | """ |
||
118 | |||
119 | self._child_nodes = [] |
||
120 | """ |
||
121 | The child nodes of this node. This list is empty for simple nodes. |
||
122 | |||
123 | :type: list[enarksh.controller.node.Node.Node] |
||
124 | """ |
||
125 | |||
126 | self._predecessor_nodes = [] |
||
127 | """ |
||
128 | The direct (simple) predecessor nodes of this node. This list is empty for complex nodes. |
||
129 | |||
130 | :type: list[enarksh.controller.node.Node.Node] |
||
131 | """ |
||
132 | |||
133 | self._successor_nodes = [] |
||
134 | """ |
||
135 | The direct (simple) successor nodes of this node. This list is empty for complex nodes. |
||
136 | |||
137 | :type: list[enarksh.controller.node.Node.Node] |
||
138 | """ |
||
139 | |||
140 | Node.event_new_node_creation.fire(self) |
||
141 | |||
142 | # ------------------------------------------------------------------------------------------------------------------ |
||
143 | @property |
||
144 | def name(self): |
||
145 | """ |
||
146 | Getter for name. Returns the name of this node. |
||
147 | |||
148 | :rtype: str |
||
149 | """ |
||
150 | return self._node_name |
||
151 | |||
152 | # ------------------------------------------------------------------------------------------------------------------ |
||
153 | @property |
||
154 | def rnd_id(self): |
||
155 | """ |
||
156 | Getter for rnd_id. Returns the ID of this node. |
||
157 | |||
158 | :rtype: int |
||
159 | """ |
||
160 | return self._rnd_id |
||
161 | |||
162 | # ------------------------------------------------------------------------------------------------------------------ |
||
163 | @property |
||
164 | def rst_id(self): |
||
165 | """ |
||
166 | Getter for rst_id. Returns the ID of the run status of this node. |
||
167 | |||
168 | :rtype: int |
||
169 | """ |
||
170 | return self._rst_id |
||
171 | |||
172 | # ------------------------------------------------------------------------------------------------------------------ |
||
173 | @StateChange.wrapper |
||
174 | def __rst_id_wrapper(self, rst_id): |
||
175 | """ |
||
176 | Setter for rst_id. Sets the the run status of this node. |
||
177 | |||
178 | :param int rst_id: The ID of the run status. |
||
179 | """ |
||
180 | self._rst_id = rst_id |
||
181 | |||
182 | # ------------------------------------------------------------------------------------------------------------------ |
||
183 | @rst_id.setter |
||
184 | def rst_id(self, rst_id): |
||
185 | """ |
||
186 | Sets the run status of this node. |
||
187 | |||
188 | :param int rst_id: The new run status for this node. |
||
189 | """ |
||
190 | old_rst_id = self.rst_id |
||
191 | self.__rst_id_wrapper(rst_id) |
||
192 | |||
193 | # Update the start datetime of this node. |
||
194 | if rst_id == C.ENK_RST_ID_RUNNING: |
||
195 | if not self._rnd_datetime_start: |
||
196 | self._rnd_datetime_start = strftime("%Y-%m-%d %H:%M:%S", localtime()) |
||
197 | self._rnd_datetime_stop = None |
||
198 | |||
199 | # Update the stop datetime of this node. |
||
200 | if old_rst_id != rst_id and rst_id in (C.ENK_RST_ID_COMPLETED, C.ENK_RST_ID_ERROR): |
||
201 | self._rnd_datetime_stop = strftime("%Y-%m-%d %H:%M:%S", localtime()) |
||
202 | |||
203 | # ------------------------------------------------------------------------------------------------------------------ |
||
204 | @property |
||
205 | def scheduling_weight(self): |
||
206 | """ |
||
207 | Return the scheduling weight (i.e. the number (direct and indirect) of simple successors). |
||
208 | |||
209 | :rtype: int |
||
210 | """ |
||
211 | return self._scheduling_weight |
||
212 | |||
213 | # ------------------------------------------------------------------------------------------------------------------ |
||
214 | def get_state_attributes(self): |
||
215 | """ |
||
216 | :rtype: dict[str,int] |
||
217 | """ |
||
218 | return {'rnd_id': self.rnd_id, |
||
219 | 'rst_id': self.rst_id} |
||
220 | |||
221 | # ------------------------------------------------------------------------------------------------------------------ |
||
222 | def __del__(self): |
||
223 | """ |
||
224 | Object destructor. |
||
225 | """ |
||
226 | log = logging.getLogger('enarksh') |
||
227 | log.debug('Deleting node: rst_id: {}'.format(self._rst_id)) |
||
228 | |||
229 | # ------------------------------------------------------------------------------------------------------------------ |
||
230 | def acquire_resources(self): |
||
231 | """ |
||
232 | Acquires the resources required by this node. |
||
233 | """ |
||
234 | for consumption in self.consumptions: |
||
235 | consumption.acquire_resource() |
||
236 | |||
237 | # ------------------------------------------------------------------------------------------------------------------ |
||
238 | def inquire_resources(self): |
||
239 | """ |
||
240 | Returns true when there enough resources available to start this node. Otherwise returns false. |
||
241 | |||
242 | :rtype: bool |
||
243 | """ |
||
244 | ret = True |
||
245 | |||
246 | for consumption in self.consumptions: |
||
247 | ret = consumption.inquire_resource() |
||
248 | if not ret: |
||
249 | break |
||
250 | |||
251 | return ret |
||
252 | |||
253 | # ------------------------------------------------------------------------------------------------------------------ |
||
254 | def release_resources(self): |
||
255 | """ |
||
256 | Releases the resources required by this node. |
||
257 | """ |
||
258 | for consumption in self.consumptions: |
||
259 | consumption.release_resource() |
||
260 | |||
261 | # ------------------------------------------------------------------------------------------------------------------ |
||
262 | def _recompute_run_status(self): |
||
263 | """ |
||
264 | Recomputes the run status of that node based on the run statuses of the predecessor nodes of this node. |
||
265 | """ |
||
266 | if self._predecessor_nodes: |
||
267 | count_not_completed = 0 |
||
268 | count_not_finished = 0 |
||
269 | for predecessor in self._predecessor_nodes: |
||
270 | if predecessor.rst_id != C.ENK_RST_ID_COMPLETED: |
||
271 | count_not_completed += 1 |
||
272 | if predecessor.rst_id != C.ENK_RST_ID_COMPLETED \ |
||
273 | and predecessor.rst_id != C.ENK_RST_ID_ERROR: |
||
274 | count_not_finished += 1 |
||
275 | |||
276 | if count_not_completed == 0: |
||
277 | # All predecessors have run status completed. |
||
278 | self._renew() |
||
279 | self.rst_id = C.ENK_RST_ID_QUEUED |
||
280 | |||
281 | if count_not_finished != 0 and self.rst_id != C.ENK_RST_ID_WAITING: |
||
282 | # A predecessors is been restarted. |
||
283 | self._renew() |
||
284 | self.rst_id = C.ENK_RST_ID_WAITING |
||
285 | |||
286 | # ------------------------------------------------------------------------------------------------------------------ |
||
287 | def child_node_event_state_change_handler(self, _event, _event_data, _listener_data): |
||
288 | """ |
||
289 | Event handler for a sate change of a child node. |
||
290 | |||
291 | :param * _event: Not used. |
||
292 | :param * _event_data: The old and new state. |
||
293 | :param * _listener_data: Not used |
||
294 | """ |
||
295 | del _event, _event_data, _listener_data |
||
296 | |||
297 | # Compute the running status of this complex node based on the running statuses of its child nodes. |
||
298 | weight = 0 |
||
299 | for child_node in self._child_nodes: |
||
300 | weight = max(weight, Node._rst_id_weight[child_node.rst_id]) |
||
301 | |||
302 | # Update the run status of this node. |
||
303 | self.rst_id = self._weight_rst_id[weight] |
||
304 | |||
305 | # ------------------------------------------------------------------------------------------------------------------ |
||
306 | def predecessor_node_event_state_change_handler(self, _event, event_data, _listener_data): |
||
307 | """ |
||
308 | Event handler for a sate change of a predecessor node. |
||
309 | |||
310 | :param * _event: Not used. |
||
311 | :param tuple[dict,dict] event_data: The old and new state. |
||
312 | :param * _listener_data: Not used |
||
313 | """ |
||
314 | del _event, _listener_data |
||
315 | |||
316 | old, new = event_data |
||
317 | |||
318 | if old['rst_id'] != new['rst_id']: |
||
319 | self._recompute_run_status() |
||
320 | |||
321 | # ------------------------------------------------------------------------------------------------------------------ |
||
322 | @StateChange.wrapper |
||
323 | def _renew(self): |
||
324 | """ |
||
325 | If required renews this node, i.e. creates a new row in ENK_RUN_NODE. |
||
326 | """ |
||
327 | if self._rst_id in (C.ENK_RST_ID_ERROR, C.ENK_RST_ID_COMPLETED): |
||
328 | self._rnd_id = DataLayer.enk_back_run_node_renew(self.rnd_id) |
||
329 | self._rst_id = C.ENK_RST_ID_WAITING |
||
330 | self._rnd_datetime_start = None |
||
331 | self._rnd_datetime_stop = None |
||
332 | self._exit_status = None |
||
333 | |||
334 | # ------------------------------------------------------------------------------------------------------------------ |
||
335 | def initialize(self, |
||
336 | node_data, |
||
337 | schedule, |
||
0 ignored issues
–
show
Unused Code
introduced
by
![]() |
|||
338 | resources, |
||
339 | resources_data, |
||
340 | consumptions, |
||
341 | consumptions_data, |
||
342 | run_nodes, |
||
343 | child_nodes, |
||
344 | direct_predecessors, |
||
345 | direct_successors, |
||
346 | successors): |
||
347 | """ |
||
348 | Initializes this node. |
||
349 | |||
350 | :param dict node_data: |
||
351 | :param dict schedule: |
||
352 | :param dict resources: |
||
353 | :param dict resources_data: |
||
354 | :param dict consumptions: |
||
355 | :param dict consumptions_data: |
||
356 | :param dict run_nodes: |
||
357 | :param dict child_nodes: |
||
358 | :param dict direct_predecessors: |
||
359 | :param dict direct_successors: |
||
360 | :param dict successors: |
||
361 | """ |
||
362 | # Initialize the resources of this node. |
||
363 | if self.rnd_id in resources_data: |
||
364 | for resource_data in resources_data[self.rnd_id]: |
||
365 | self.resources.append(resources[resource_data['rsc_id']]) |
||
366 | |||
367 | # Initialize the consumptions of this node. |
||
368 | if self.rnd_id in consumptions_data: |
||
369 | for consumption_data in consumptions_data[self.rnd_id]: |
||
370 | self.consumptions.append(consumptions[consumption_data['cns_id']]) |
||
371 | |||
372 | # Observe all direct predecessor nodes of this node (for simple nodes only) and initialize predecessor state |
||
373 | # count. |
||
374 | if self.rnd_id in direct_predecessors: |
||
375 | for predecessor in direct_predecessors[self.rnd_id]: |
||
376 | node = run_nodes[predecessor] |
||
377 | node.event_state_change.register_listener(self.predecessor_node_event_state_change_handler) |
||
378 | self._predecessor_nodes.append(node) |
||
379 | |||
380 | # Observe the child run_nodes of this node (for complex nodes only). |
||
381 | if self.rnd_id in child_nodes: |
||
382 | for child_node in child_nodes[self.rnd_id]: |
||
383 | node = run_nodes[child_node['rnd_id']] |
||
384 | node.event_state_change.register_listener(self.child_node_event_state_change_handler) |
||
385 | self._child_nodes.append(node) |
||
386 | |||
387 | # Set the parent node of this node. |
||
388 | if node_data['rnd_id_parent']: |
||
389 | self._parent_node = run_nodes[node_data['rnd_id_parent']] |
||
390 | |||
391 | # |
||
392 | if self.rnd_id in direct_successors: |
||
393 | for successor in direct_successors[self.rnd_id]: |
||
394 | node = run_nodes[successor] |
||
395 | self._successor_nodes.append(node) |
||
396 | |||
397 | # Set scheduling weight, i.e. the number of (direct and indirect) successors. |
||
398 | if self.rnd_id in successors: |
||
399 | self._scheduling_weight = len(successors[self.rnd_id]) |
||
400 | |||
401 | # ------------------------------------------------------------------------------------------------------------------ |
||
402 | def get_start_message(self, sch_id): |
||
403 | """ |
||
404 | Returns the message to be send to the spawner for starting this node. Raises an exception if this node can not |
||
405 | be started by the spawner (e.g. a complex node). |
||
406 | |||
407 | :param int sch_id: The ID of the schedule. |
||
408 | |||
409 | :rtype: enarksh.message.Message.Message |
||
410 | """ |
||
411 | raise RuntimeError("Node of class '{}' can not be started by the spawner".format(self.__class__)) |
||
412 | |||
413 | # ------------------------------------------------------------------------------------------------------------------ |
||
414 | def start(self): |
||
415 | """ |
||
416 | Does the housekeeping for starting this node. Returns True if an actual job must be started by the spawner. |
||
417 | Returns False otherwise. |
||
418 | |||
419 | :rtype: bool |
||
420 | """ |
||
421 | raise RuntimeError("Node of class '{}' can not be marked started".format(self.__class__)) |
||
422 | |||
423 | # ------------------------------------------------------------------------------------------------------------------ |
||
424 | def stop(self, exit_status): |
||
425 | """ |
||
426 | Does the housekeeping when the node has stopped. |
||
427 | |||
428 | :param int exit_status: The exits status of the job. |
||
429 | """ |
||
430 | raise RuntimeError("Node of class '{}' can not be marked stopped".format(self.__class__)) |
||
431 | |||
432 | # ------------------------------------------------------------------------------------------------------------------ |
||
433 | @abc.abstractmethod |
||
434 | def restart(self): |
||
435 | """ |
||
436 | Restarts this node. |
||
437 | """ |
||
438 | raise NotImplementedError() |
||
439 | |||
440 | # ------------------------------------------------------------------------------------------------------------------ |
||
441 | @abc.abstractmethod |
||
442 | def restart_failed(self): |
||
443 | """ |
||
444 | Restarts all failed simple nodes. |
||
445 | """ |
||
446 | raise NotImplementedError() |
||
447 | |||
448 | # ------------------------------------------------------------------------------------------------------------------ |
||
449 | def fake_get_resource_by_name(self, name): |
||
450 | """ |
||
451 | Returns a resource. |
||
452 | |||
453 | :param str name: The name of the requested resource. |
||
454 | |||
455 | :rtype: enarksh.controller.resource.Resource.Resource|None |
||
456 | """ |
||
457 | for resource in self.resources: |
||
458 | if resource.get_name() == name: |
||
459 | return resource |
||
460 | |||
461 | if self._parent_node: |
||
462 | return self._parent_node.fake_get_resource_by_name(name) |
||
463 | |||
464 | return None |
||
465 | |||
466 | # ------------------------------------------------------------------------------------------------------------------ |
||
467 | def sync_state(self): |
||
468 | """ |
||
469 | Updates the state of this node into the database. |
||
470 | """ |
||
471 | DataLayer.enk_back_run_node_update_status(self.rnd_id, |
||
472 | self.rst_id, |
||
473 | self._rnd_datetime_start, |
||
474 | self._rnd_datetime_stop, |
||
475 | self._exit_status) |
||
476 | |||
477 | # ------------------------------------------------------------------------------------------------------------------ |
||
478 | def get_uri(self, obj_type='node'): |
||
479 | """ |
||
480 | Returns the URI of this node. |
||
481 | |||
482 | :param str obj_type: The entity type. |
||
483 | |||
484 | :rtype: str |
||
485 | """ |
||
486 | if self._parent_node: |
||
487 | uri = self._parent_node.get_uri(obj_type) |
||
488 | else: |
||
489 | uri = '//' + obj_type |
||
490 | |||
491 | return uri + '/' + self._node_name |
||
492 | |||
493 | # ------------------------------------------------------------------------------------------------------------------ |
||
494 | @abc.abstractmethod |
||
495 | def is_simple_node(self): |
||
496 | """ |
||
497 | Returns True if this node is a simple node. Otherwise, returns False. |
||
498 | |||
499 | :rtype: bool |
||
500 | """ |
||
501 | raise NotImplementedError() |
||
502 | |||
503 | # ------------------------------------------------------------------------------------------------------------------ |
||
504 | @abc.abstractmethod |
||
505 | def is_complex_node(self): |
||
506 | """ |
||
507 | Returns True if this node is a complex node. Otherwise, returns False. |
||
508 | |||
509 | :rtype: bool |
||
510 | """ |
||
511 | raise NotImplementedError() |
||
512 | |||
513 | # ---------------------------------------------------------------------------------------------------------------------- |
||
514 |