Total Complexity | 51 |
Total Lines | 526 |
Duplicated Lines | 0 % |
Coverage | 0% |
Changes | 2 | ||
Bugs | 0 | Features | 0 |
Complex classes like Controller often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | """ |
||
25 | class Controller: |
||
|
|||
26 | _instance = None |
||
27 | |||
28 | # ------------------------------------------------------------------------------------------------------------------ |
||
29 | def __init__(self): |
||
30 | Controller._instance = self |
||
31 | |||
32 | self._zmq_context = None |
||
33 | """ |
||
34 | :type: zmq.Context |
||
35 | """ |
||
36 | |||
37 | self._zmq_pull_socket = zmq.sugar.socket.Socket |
||
38 | """ |
||
39 | :type: zmq.sugar.socket.Socket |
||
40 | """ |
||
41 | |||
42 | self._zmq_lockstep_socket = None |
||
43 | """ |
||
44 | :type: zmq.sugar.socket.Socket |
||
45 | """ |
||
46 | |||
47 | self._zmq_spanner = None |
||
48 | """ |
||
49 | :type: Socket |
||
50 | """ |
||
51 | |||
52 | self._zmq_logger = None |
||
53 | """ |
||
54 | :type: Socket |
||
55 | """ |
||
56 | |||
57 | self._host_resources = {} |
||
58 | """ |
||
59 | All resources defined at host level. |
||
60 | |||
61 | :type: dict |
||
62 | """ |
||
63 | |||
64 | self._schedules = {} |
||
65 | """ |
||
66 | All the current schedules. |
||
67 | |||
68 | :type: dict |
||
69 | """ |
||
70 | |||
71 | # ------------------------------------------------------------------------------------------------------------------ |
||
72 | def _zmq_init(self): |
||
73 | self._zmq_context = zmq.Context() |
||
74 | |||
75 | # Create socket for asynchronous incoming messages. |
||
76 | self._zmq_pull_socket = self._zmq_context.socket(zmq.PULL) |
||
77 | self._zmq_pull_socket.bind(enarksh.CONTROLLER_PULL_END_POINT) |
||
78 | |||
79 | # Create socket for lockstep incoming messages. |
||
80 | self._zmq_lockstep_socket = self._zmq_context.socket(zmq.REP) |
||
81 | self._zmq_lockstep_socket.bind(enarksh.CONTROLLER_LOCKSTEP_END_POINT) |
||
82 | |||
83 | # Create socket for sending asynchronous messages to the spanner. |
||
84 | self._zmq_spanner = self._zmq_context.socket(zmq.PUSH) |
||
1 ignored issue
–
show
|
|||
85 | self._zmq_spanner.connect(enarksh.SPAWNER_PULL_END_POINT) |
||
86 | |||
87 | # Create socket for sending asynchronous messages to the logger. |
||
88 | self._zmq_logger = self._zmq_context.socket(zmq.PUSH) |
||
1 ignored issue
–
show
|
|||
89 | self._zmq_logger.connect(enarksh.LOGGER_PULL_END_POINT) |
||
90 | |||
91 | # ------------------------------------------------------------------------------------------------------------------ |
||
92 | @staticmethod |
||
93 | def daemonize(): |
||
94 | enarksh.daemonize(enarksh.HOME + '/var/lock/controllerd.pid', |
||
95 | '/dev/null', |
||
96 | enarksh.HOME + '/var/log/controllerd.log', |
||
97 | enarksh.HOME + '/var/log/controllerd.log') |
||
98 | |||
99 | # ------------------------------------------------------------------------------------------------------------------ |
||
100 | @staticmethod |
||
101 | def _set_unprivileged_user(): |
||
102 | """ |
||
103 | Set the real and effective user and group to an unprivileged user. |
||
104 | """ |
||
105 | _, _, uid, gid, _, _, _ = pwd.getpwnam('enarksh') |
||
106 | |||
107 | os.setresgid(gid, gid, 0) |
||
108 | os.setresuid(uid, uid, 0) |
||
109 | |||
110 | # ------------------------------------------------------------------------------------------------------------------ |
||
111 | def _create_host_resources(self): |
||
112 | """ |
||
113 | Creates resources defined at host level. |
||
114 | """ |
||
115 | resources_data = DataLayer.enk_back_get_host_resources() |
||
116 | for resource_data in resources_data: |
||
117 | self._host_resources[resource_data['rsc_id']] = Resource.create_resource(resource_data) |
||
118 | |||
119 | # ------------------------------------------------------------------------------------------------------------------ |
||
120 | def _startup(self): |
||
121 | """ |
||
122 | Performs the necessary actions for starting the controller daemon. |
||
123 | """ |
||
124 | print('Start controller.') |
||
125 | |||
126 | # Set database configuration options. |
||
127 | DataLayer.config['host'] = enarksh.MYSQL_HOSTNAME |
||
128 | DataLayer.config['user'] = enarksh.MYSQL_USERNAME |
||
129 | DataLayer.config['password'] = enarksh.MYSQL_PASSWORD |
||
130 | DataLayer.config['database'] = enarksh.MYSQL_SCHEMA |
||
131 | DataLayer.config['port'] = enarksh.MYSQL_PORT |
||
132 | DataLayer.config['autocommit'] = False |
||
133 | |||
134 | # Connect to the MySQL. |
||
135 | DataLayer.connect() |
||
136 | |||
137 | # Sanitise the data in the database. |
||
138 | DataLayer.enk_back_controller_init() |
||
139 | |||
140 | # Create resources defined at host level. |
||
141 | self._create_host_resources() |
||
142 | |||
143 | # Set the effective user and group to an unprivileged user and group. |
||
144 | # self._set_unprivileged_user() |
||
145 | |||
146 | # Become a daemon. |
||
147 | # self.__daemonize() |
||
148 | |||
149 | self._zmq_init() |
||
150 | |||
151 | # Housekeeping to track statuses of nodes. |
||
152 | Schedule.register_observer_schedule_termination(self.slot_schedule_termination) |
||
153 | |||
154 | # Commit transaction and close connection to MySQL. |
||
155 | DataLayer.commit() |
||
156 | DataLayer.disconnect() |
||
157 | |||
158 | # ------------------------------------------------------------------------------------------------------------------ |
||
159 | def slot_schedule_termination(self, schedule, rst_id): |
||
160 | """ |
||
161 | :param enarksh.Controller.Schedule.Schedule schedule: |
||
162 | :param int rst_id: |
||
163 | """ |
||
164 | print("Schedule %s has terminated with status %s" % (schedule.get_sch_id(), rst_id)) |
||
165 | |||
166 | self._unload_schedule(schedule.get_sch_id()) |
||
167 | |||
168 | # ------------------------------------------------------------------------------------------------------------------ |
||
169 | def _load_schedule(self, sch_id): |
||
170 | """ |
||
171 | :param int sch_id: |
||
172 | |||
173 | :rtype: enarksh.Controller.Schedule.Schedule |
||
174 | """ |
||
175 | print("Loading schedule '%s'." % sch_id) |
||
176 | |||
177 | # Load the schedule. |
||
178 | schedule = Schedule(sch_id, self._host_resources) |
||
179 | |||
180 | # Register the schedule. |
||
181 | self._schedules[sch_id] = schedule |
||
182 | |||
183 | return schedule |
||
184 | |||
185 | # ------------------------------------------------------------------------------------------------------------------ |
||
186 | def _unload_schedule(self, sch_id): |
||
187 | """ |
||
188 | :param int sch_id: |
||
189 | """ |
||
190 | print("Unloading schedule '%s'." % sch_id) |
||
191 | |||
192 | if sch_id in self._schedules: |
||
193 | schedule = self._schedules[sch_id] |
||
194 | |||
195 | # Remove the schedule. |
||
196 | del self._schedules[sch_id] |
||
197 | |||
198 | schedule.destroy() |
||
199 | |||
200 | gc.collect() |
||
201 | |||
202 | # ------------------------------------------------------------------------------------------------------------------ |
||
203 | def _reload_schedule(self, sch_id): |
||
204 | """ |
||
205 | :param int sch_id: |
||
206 | |||
207 | :rtype: enarksh.Controller.Schedule.Schedule |
||
208 | """ |
||
209 | self._unload_schedule(sch_id) |
||
210 | |||
211 | return self._load_schedule(sch_id) |
||
212 | |||
213 | # ------------------------------------------------------------------------------------------------------------------ |
||
214 | @staticmethod |
||
215 | def queue_compare(schedule1, schedule2): |
||
216 | """ |
||
217 | Compares two schedules for sorting queued nodes. |
||
218 | |||
219 | :param enarksh.Controller.Schedule.Schedule schedule1: |
||
220 | :param enarksh.Controller.Schedule.Schedule schedule2: |
||
221 | |||
222 | :rtype: |
||
223 | """ |
||
224 | return -(schedule1.get_schedule_load() - schedule2.get_schedule_load) |
||
225 | |||
226 | # ------------------------------------------------------------------------------------------------------------------ |
||
227 | def _queue_handler(self): |
||
228 | """ |
||
229 | Starts node that are queued and for which there are enough resources available. |
||
230 | """ |
||
231 | # Return immediately if there are no loaded schedules. |
||
232 | if not self._schedules: |
||
233 | return |
||
234 | |||
235 | DataLayer.connect() |
||
236 | DataLayer.start_transaction() |
||
237 | |||
238 | while True: |
||
239 | start = False |
||
240 | if self._schedules: |
||
241 | schedules = sorted(self._schedules, key=functools.cmp_to_key(Controller.queue_compare)) |
||
242 | schedule = self._schedules[schedules[0]] |
||
243 | queue = schedule.get_queue() |
||
244 | for node in queue: |
||
245 | # Inquire if there are enough resources available for the node. |
||
246 | start = node.inquire_resources() |
||
247 | if start: |
||
248 | span_job = node.start() |
||
249 | |||
250 | # If required send a message to the spanner. |
||
251 | if span_job: |
||
252 | message = node.get_start_message() |
||
253 | message['sch_id'] = schedule.get_sch_id() |
||
254 | self._zmq_spanner.send_json(message) |
||
255 | |||
256 | else: |
||
257 | node.stop(0) |
||
258 | |||
259 | # If a node has been started leave inner loop. |
||
260 | break |
||
261 | |||
262 | # If no node has been started leave the outer loop. |
||
263 | if not start: |
||
264 | break |
||
265 | |||
266 | # Commit the last transaction (if any) and close the connection to the database. |
||
267 | DataLayer.commit() |
||
268 | DataLayer.disconnect() |
||
269 | |||
270 | # ------------------------------------------------------------------------------------------------------------------ |
||
271 | def _get_schedule_by_sch_id(self, sch_id): |
||
272 | """ |
||
273 | Returns a schedule. |
||
274 | |||
275 | :param int sch_id: The ID of the schedule. |
||
276 | |||
277 | :rtype: enarksh.Controller.Schedule.Schedule |
||
278 | """ |
||
279 | schedule = self._schedules.get(int(sch_id), None) |
||
280 | if not schedule: |
||
281 | # Load the schedule if the schedule is not currently loaded. |
||
282 | schedule = self._load_schedule(sch_id) |
||
283 | |||
284 | return schedule |
||
285 | |||
286 | # ------------------------------------------------------------------------------------------------------------------ |
||
287 | def _get_possible_node_actions(self, sch_id, rnd_id): |
||
288 | """ |
||
289 | Returns the possible actions for a node. |
||
290 | |||
291 | :param int sch_id: The ID of the schedule of the node. |
||
292 | :param int rnd_id: The ID of the node. |
||
293 | |||
294 | :rtype dict[str, bool|dict[int, dict[str, mixed]]]: Dictionary with possible node actions. |
||
295 | """ |
||
296 | message = {'actions': {enarksh.ENK_ACT_ID_TRIGGER: {'act_id': enarksh.ENK_ACT_ID_TRIGGER, |
||
297 | 'act_title': 'Trigger', |
||
298 | 'act_enabled': False}, |
||
299 | enarksh.ENK_ACT_ID_RESTART: {'act_id': enarksh.ENK_ACT_ID_RESTART, |
||
300 | 'act_title': 'Restart', |
||
301 | 'act_enabled': False}, |
||
302 | enarksh.ENK_ACT_ID_RESTART_FAILED: {'act_id': enarksh.ENK_ACT_ID_RESTART_FAILED, |
||
303 | 'act_title': 'Restart Failed', |
||
304 | 'act_enabled': False}}, |
||
305 | 'mail_on_completion': False, |
||
306 | 'mail_on_error': False} |
||
307 | |||
308 | # Find the schedule of the node. |
||
309 | schedule = self._get_schedule_by_sch_id(sch_id) |
||
310 | if not schedule: |
||
311 | # Node rnd_id is not part of a current run of a current schedule revision. |
||
312 | return message |
||
313 | |||
314 | return schedule.request_possible_node_actions(rnd_id, message) |
||
315 | |||
316 | # ------------------------------------------------------------------------------------------------------------------ |
||
317 | def _message_handler_request_possible_node_actions(self, message): |
||
318 | """ |
||
319 | Handles a request (from the web interface) for possible actions of a certain node. |
||
320 | |||
321 | :param dict message: The message of the request. |
||
322 | """ |
||
323 | sch_id = int(message['sch_id']) |
||
324 | rnd_id = int(message['rnd_id']) |
||
325 | |||
326 | response = self._get_possible_node_actions(sch_id, rnd_id) |
||
327 | |||
328 | # Send the message to the web interface. |
||
329 | self._zmq_lockstep_socket.send_json(response) |
||
330 | |||
331 | # ------------------------------------------------------------------------------------------------------------------ |
||
332 | def _message_handler_schedule_definition(self, message): |
||
333 | """ |
||
334 | Handles a request (from the operator) for loading a new schedule. |
||
335 | |||
336 | :param dict message: The message of the request. |
||
337 | """ |
||
338 | try: |
||
339 | # Validate XML against XSD. |
||
340 | reader = XmlReader() |
||
341 | schedule = reader.parse_schedule(message['xml'], message['filename']) |
||
342 | |||
343 | # Test schedule is currently running. |
||
344 | name = schedule.name |
||
345 | if name in self._schedules: |
||
346 | raise Exception("Schedule '%s' is currently running." % name) |
||
347 | |||
348 | # Insert the XML definition as BLOB in tot the database. |
||
349 | blb_id = DataLayer.enk_blob_insert_blob(os.path.basename(message['filename']), 'text/xml', message['xml']) |
||
350 | srv_id = DataLayer.enk_reader_schedule_create_revision(blb_id, name) |
||
351 | if not srv_id: |
||
352 | raise Exception("Schedule '%s' is already loaded." % name) |
||
353 | |||
354 | # Store the new schedule definition into the database. |
||
355 | schedule.store(srv_id, 1) |
||
356 | DataLayer.enk_back_schedule_revision_create_run(srv_id) |
||
357 | |||
358 | response = {'ret': 0, |
||
359 | 'message': "Schedule '%s' successfully loaded." % name} |
||
360 | except Exception as exception: |
||
361 | print(exception, file=sys.stderr) |
||
362 | response = {'ret': -1, |
||
363 | 'message': str(exception)} |
||
364 | traceback.print_exc(file=sys.stderr) |
||
365 | |||
366 | self._zmq_lockstep_socket.send_json(response) |
||
367 | |||
368 | # ------------------------------------------------------------------------------------------------------------------ |
||
369 | def _message_handler_dynamic_worker_definition(self, message): |
||
370 | """ |
||
371 | Handles a request for loading a dynamic worker definition. |
||
372 | |||
373 | :param dict message: The message of the request. |
||
374 | """ |
||
375 | try: |
||
376 | sch_id = int(message['sch_id']) |
||
377 | rnd_id = int(message['rnd_id']) |
||
378 | |||
379 | print('rnd_id: ' + str(rnd_id)) |
||
380 | |||
381 | # Get info of the dynamic node. |
||
382 | info = DataLayer.enk_back_run_node_get_dynamic_info_by_generator(rnd_id) |
||
383 | |||
384 | schedule = self._get_schedule_by_sch_id(sch_id) |
||
385 | parent = FakeParent(schedule, |
||
386 | self._host_resources, |
||
387 | info['nod_id_outer_worker'], |
||
388 | info['rnd_id_outer_worker']) |
||
389 | |||
390 | # Validate XML against XSD. |
||
391 | reader = XmlReader() |
||
392 | inner_worker = reader.parse_dynamic_worker(message['definition'], parent) |
||
393 | name = inner_worker.name |
||
394 | |||
395 | # Note: Dynamic node is the parent of the worker node which is the parent of the inner worker node. |
||
396 | inner_worker.set_levels(info['nod_recursion_level']+2) |
||
397 | |||
398 | # Store the dynamically defined inner worker node. |
||
399 | inner_worker.store(info['srv_id'], 0) |
||
400 | |||
401 | # Create dependencies between the input and output port of the worker node and its child node(s). |
||
402 | DataLayer.enk_back_node_dynamic_add_dependencies(info['nod_id_outer_worker'], inner_worker.nod_id) |
||
403 | |||
404 | # XXX trigger reload of front end |
||
405 | |||
406 | # Unload the schedule to force a reload of the schedule with new nodes added. |
||
407 | self._unload_schedule(sch_id) |
||
408 | |||
409 | response = {'ret': 0, |
||
410 | 'message': "Worker '%s' successfully loaded." % name} |
||
411 | |||
412 | except Exception as exception: |
||
413 | print(exception, file=sys.stderr) |
||
414 | response = {'ret': -1, |
||
415 | 'message': str(exception)} |
||
416 | traceback.print_exc(file=sys.stderr) |
||
417 | |||
418 | self._zmq_lockstep_socket.send_json(response) |
||
419 | |||
420 | # ------------------------------------------------------------------------------------------------------------------ |
||
421 | def _message_handler_request_node_action(self, message): |
||
422 | """ |
||
423 | Executes a node action request. |
||
424 | |||
425 | :param dict message: dict The message of the request. |
||
426 | """ |
||
427 | # Compose a response message for the web interface. |
||
428 | response = {'ret': 0, |
||
429 | 'new_run': 0, |
||
430 | 'message': 'OK'} |
||
431 | |||
432 | try: |
||
433 | sch_id = int(message['sch_id']) |
||
434 | rnd_id = int(message['rnd_id']) |
||
435 | act_id = int(message['act_id']) |
||
436 | |||
437 | actions = self._get_possible_node_actions(sch_id, rnd_id) |
||
438 | if act_id not in actions['actions'] or not actions['actions'][act_id]['act_enabled']: |
||
439 | response['ret'] = -1 |
||
440 | response['message'] = 'Not a valid action' |
||
441 | else: |
||
442 | schedule = self._get_schedule_by_sch_id(sch_id) |
||
443 | reload = schedule.request_node_action(rnd_id, |
||
444 | act_id, |
||
445 | message['usr_login'], |
||
446 | (message['mail_on_completion']), |
||
447 | (message['mail_on_error'])) |
||
448 | if reload: |
||
449 | # Schedule must be reloaded. |
||
450 | schedule = self._reload_schedule(schedule.sch_id) |
||
451 | # A reload is only required when the schedule is been triggered. However, this trigger is lost by |
||
452 | # reloading the schedule. So, resend the trigger. |
||
453 | schedule.request_node_action(schedule.get_activate_node().get_rnd_id(), |
||
454 | act_id, |
||
455 | message['usr_login'], |
||
456 | (message['mail_on_completion']), |
||
457 | (message['mail_on_error'])) |
||
458 | |||
459 | if act_id == enarksh.ENK_ACT_ID_TRIGGER: |
||
460 | response['new_run'] = 1 |
||
461 | |||
462 | except Exception as exception: |
||
463 | print(exception, file=sys.stderr) |
||
464 | traceback.print_exc(file=sys.stderr) |
||
465 | |||
466 | response['ret'] = -1 |
||
467 | response['message'] = 'Internal error' |
||
468 | |||
469 | # Send the message to the web interface. |
||
470 | self._zmq_lockstep_socket.send_json(response) |
||
471 | |||
472 | # ------------------------------------------------------------------------------------------------------------------ |
||
473 | def _message_handler_node_stop(self, message): |
||
474 | """ |
||
475 | Handles a message sent by the spanner after a job has finished. |
||
476 | |||
477 | :param dict message: |
||
478 | """ |
||
479 | schedule = self._get_schedule_by_sch_id(message['sch_id']) |
||
480 | schedule.event_node_stop(message['rnd_id'], message['exit_status']) |
||
481 | |||
482 | # ------------------------------------------------------------------------------------------------------------------ |
||
483 | def _message_handler(self): |
||
484 | """ |
||
485 | Waits for a new messages and processes these messages. |
||
486 | """ |
||
487 | poller = zmq.Poller() |
||
488 | poller.register(self._zmq_pull_socket, zmq.POLLIN) |
||
489 | poller.register(self._zmq_lockstep_socket, zmq.POLLIN) |
||
490 | |||
491 | socks = dict(poller.poll()) |
||
492 | |||
493 | DataLayer.connect() |
||
494 | DataLayer.start_transaction() |
||
495 | |||
496 | if self._zmq_pull_socket in socks: |
||
497 | message = self._zmq_pull_socket.recv_json() |
||
498 | |||
499 | if message['type'] == 'node_stop': |
||
500 | self._message_handler_node_stop(message) |
||
501 | |||
502 | else: |
||
503 | raise Exception("Unknown message type '%s'." % message['type']) |
||
504 | |||
505 | if self._zmq_lockstep_socket in socks: |
||
506 | message = self._zmq_lockstep_socket.recv_json() |
||
507 | |||
508 | if message['type'] == 'schedule_definition': |
||
509 | self._message_handler_schedule_definition(message) |
||
510 | |||
511 | elif message['type'] == 'request_node_action': |
||
512 | self._message_handler_request_node_action(message) |
||
513 | |||
514 | elif message['type'] == 'request_possible_node_actions': |
||
515 | self._message_handler_request_possible_node_actions(message) |
||
516 | |||
517 | elif message['type'] == 'dynamic_worker_definition': |
||
518 | self._message_handler_dynamic_worker_definition(message) |
||
519 | |||
520 | else: |
||
521 | raise Exception("Unknown message type '%s'." % message['type']) |
||
522 | |||
523 | DataLayer.commit() |
||
524 | DataLayer.disconnect() |
||
525 | |||
526 | # ------------------------------------------------------------------------------------------------------------------ |
||
527 | def main(self): |
||
528 | """ |
||
529 | The main function of the job controller. |
||
530 | """ |
||
531 | # Perform the necessary actions for starting the controller. |
||
532 | self._startup() |
||
533 | |||
534 | while True: |
||
535 | try: |
||
536 | # Wait for new messages and process these messages. |
||
537 | self._message_handler() |
||
538 | |||
539 | # Try to start nodes that are queued. |
||
540 | self._queue_handler() |
||
541 | |||
542 | except Exception as exception1: |
||
543 | try: |
||
544 | DataLayer.rollback() |
||
545 | DataLayer.disconnect() |
||
546 | except Exception as exception2: |
||
547 | print(exception2, file=sys.stderr) |
||
548 | traceback.print_exc(file=sys.stderr) |
||
549 | print(exception1, file=sys.stderr) |
||
550 | traceback.print_exc(file=sys.stderr) |
||
551 | |||
554 |
The coding style of this project requires that you add a docstring to this code element. Below, you find an example for methods:
If you would like to know more about docstrings, we recommend to read PEP-257: Docstring Conventions.