| Total Complexity | 51 |
| Total Lines | 526 |
| Duplicated Lines | 0 % |
| Coverage | 0% |
| Changes | 2 | ||
| Bugs | 0 | Features | 0 |
Complex classes like Controller often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | """ |
||
| 25 | class Controller: |
||
|
|
|||
| 26 | _instance = None |
||
| 27 | |||
| 28 | # ------------------------------------------------------------------------------------------------------------------ |
||
| 29 | def __init__(self): |
||
| 30 | Controller._instance = self |
||
| 31 | |||
| 32 | self._zmq_context = None |
||
| 33 | """ |
||
| 34 | :type: zmq.Context |
||
| 35 | """ |
||
| 36 | |||
| 37 | self._zmq_pull_socket = zmq.sugar.socket.Socket |
||
| 38 | """ |
||
| 39 | :type: zmq.sugar.socket.Socket |
||
| 40 | """ |
||
| 41 | |||
| 42 | self._zmq_lockstep_socket = None |
||
| 43 | """ |
||
| 44 | :type: zmq.sugar.socket.Socket |
||
| 45 | """ |
||
| 46 | |||
| 47 | self._zmq_spanner = None |
||
| 48 | """ |
||
| 49 | :type: Socket |
||
| 50 | """ |
||
| 51 | |||
| 52 | self._zmq_logger = None |
||
| 53 | """ |
||
| 54 | :type: Socket |
||
| 55 | """ |
||
| 56 | |||
| 57 | self._host_resources = {} |
||
| 58 | """ |
||
| 59 | All resources defined at host level. |
||
| 60 | |||
| 61 | :type: dict |
||
| 62 | """ |
||
| 63 | |||
| 64 | self._schedules = {} |
||
| 65 | """ |
||
| 66 | All the current schedules. |
||
| 67 | |||
| 68 | :type: dict |
||
| 69 | """ |
||
| 70 | |||
| 71 | # ------------------------------------------------------------------------------------------------------------------ |
||
| 72 | def _zmq_init(self): |
||
| 73 | self._zmq_context = zmq.Context() |
||
| 74 | |||
| 75 | # Create socket for asynchronous incoming messages. |
||
| 76 | self._zmq_pull_socket = self._zmq_context.socket(zmq.PULL) |
||
| 77 | self._zmq_pull_socket.bind(enarksh.CONTROLLER_PULL_END_POINT) |
||
| 78 | |||
| 79 | # Create socket for lockstep incoming messages. |
||
| 80 | self._zmq_lockstep_socket = self._zmq_context.socket(zmq.REP) |
||
| 81 | self._zmq_lockstep_socket.bind(enarksh.CONTROLLER_LOCKSTEP_END_POINT) |
||
| 82 | |||
| 83 | # Create socket for sending asynchronous messages to the spanner. |
||
| 84 | self._zmq_spanner = self._zmq_context.socket(zmq.PUSH) |
||
|
1 ignored issue
–
show
|
|||
| 85 | self._zmq_spanner.connect(enarksh.SPAWNER_PULL_END_POINT) |
||
| 86 | |||
| 87 | # Create socket for sending asynchronous messages to the logger. |
||
| 88 | self._zmq_logger = self._zmq_context.socket(zmq.PUSH) |
||
|
1 ignored issue
–
show
|
|||
| 89 | self._zmq_logger.connect(enarksh.LOGGER_PULL_END_POINT) |
||
| 90 | |||
| 91 | # ------------------------------------------------------------------------------------------------------------------ |
||
| 92 | @staticmethod |
||
| 93 | def daemonize(): |
||
| 94 | enarksh.daemonize(enarksh.HOME + '/var/lock/controllerd.pid', |
||
| 95 | '/dev/null', |
||
| 96 | enarksh.HOME + '/var/log/controllerd.log', |
||
| 97 | enarksh.HOME + '/var/log/controllerd.log') |
||
| 98 | |||
| 99 | # ------------------------------------------------------------------------------------------------------------------ |
||
| 100 | @staticmethod |
||
| 101 | def _set_unprivileged_user(): |
||
| 102 | """ |
||
| 103 | Set the real and effective user and group to an unprivileged user. |
||
| 104 | """ |
||
| 105 | _, _, uid, gid, _, _, _ = pwd.getpwnam('enarksh') |
||
| 106 | |||
| 107 | os.setresgid(gid, gid, 0) |
||
| 108 | os.setresuid(uid, uid, 0) |
||
| 109 | |||
| 110 | # ------------------------------------------------------------------------------------------------------------------ |
||
| 111 | def _create_host_resources(self): |
||
| 112 | """ |
||
| 113 | Creates resources defined at host level. |
||
| 114 | """ |
||
| 115 | resources_data = DataLayer.enk_back_get_host_resources() |
||
| 116 | for resource_data in resources_data: |
||
| 117 | self._host_resources[resource_data['rsc_id']] = Resource.create_resource(resource_data) |
||
| 118 | |||
| 119 | # ------------------------------------------------------------------------------------------------------------------ |
||
| 120 | def _startup(self): |
||
| 121 | """ |
||
| 122 | Performs the necessary actions for starting the controller daemon. |
||
| 123 | """ |
||
| 124 | print('Start controller.') |
||
| 125 | |||
| 126 | # Set database configuration options. |
||
| 127 | DataLayer.config['host'] = enarksh.MYSQL_HOSTNAME |
||
| 128 | DataLayer.config['user'] = enarksh.MYSQL_USERNAME |
||
| 129 | DataLayer.config['password'] = enarksh.MYSQL_PASSWORD |
||
| 130 | DataLayer.config['database'] = enarksh.MYSQL_SCHEMA |
||
| 131 | DataLayer.config['port'] = enarksh.MYSQL_PORT |
||
| 132 | DataLayer.config['autocommit'] = False |
||
| 133 | |||
| 134 | # Connect to the MySQL. |
||
| 135 | DataLayer.connect() |
||
| 136 | |||
| 137 | # Sanitise the data in the database. |
||
| 138 | DataLayer.enk_back_controller_init() |
||
| 139 | |||
| 140 | # Create resources defined at host level. |
||
| 141 | self._create_host_resources() |
||
| 142 | |||
| 143 | # Set the effective user and group to an unprivileged user and group. |
||
| 144 | # self._set_unprivileged_user() |
||
| 145 | |||
| 146 | # Become a daemon. |
||
| 147 | # self.__daemonize() |
||
| 148 | |||
| 149 | self._zmq_init() |
||
| 150 | |||
| 151 | # Housekeeping to track statuses of nodes. |
||
| 152 | Schedule.register_observer_schedule_termination(self.slot_schedule_termination) |
||
| 153 | |||
| 154 | # Commit transaction and close connection to MySQL. |
||
| 155 | DataLayer.commit() |
||
| 156 | DataLayer.disconnect() |
||
| 157 | |||
| 158 | # ------------------------------------------------------------------------------------------------------------------ |
||
| 159 | def slot_schedule_termination(self, schedule, rst_id): |
||
| 160 | """ |
||
| 161 | :param enarksh.Controller.Schedule.Schedule schedule: |
||
| 162 | :param int rst_id: |
||
| 163 | """ |
||
| 164 | print("Schedule %s has terminated with status %s" % (schedule.get_sch_id(), rst_id)) |
||
| 165 | |||
| 166 | self._unload_schedule(schedule.get_sch_id()) |
||
| 167 | |||
| 168 | # ------------------------------------------------------------------------------------------------------------------ |
||
| 169 | def _load_schedule(self, sch_id): |
||
| 170 | """ |
||
| 171 | :param int sch_id: |
||
| 172 | |||
| 173 | :rtype: enarksh.Controller.Schedule.Schedule |
||
| 174 | """ |
||
| 175 | print("Loading schedule '%s'." % sch_id) |
||
| 176 | |||
| 177 | # Load the schedule. |
||
| 178 | schedule = Schedule(sch_id, self._host_resources) |
||
| 179 | |||
| 180 | # Register the schedule. |
||
| 181 | self._schedules[sch_id] = schedule |
||
| 182 | |||
| 183 | return schedule |
||
| 184 | |||
| 185 | # ------------------------------------------------------------------------------------------------------------------ |
||
| 186 | def _unload_schedule(self, sch_id): |
||
| 187 | """ |
||
| 188 | :param int sch_id: |
||
| 189 | """ |
||
| 190 | print("Unloading schedule '%s'." % sch_id) |
||
| 191 | |||
| 192 | if sch_id in self._schedules: |
||
| 193 | schedule = self._schedules[sch_id] |
||
| 194 | |||
| 195 | # Remove the schedule. |
||
| 196 | del self._schedules[sch_id] |
||
| 197 | |||
| 198 | schedule.destroy() |
||
| 199 | |||
| 200 | gc.collect() |
||
| 201 | |||
| 202 | # ------------------------------------------------------------------------------------------------------------------ |
||
| 203 | def _reload_schedule(self, sch_id): |
||
| 204 | """ |
||
| 205 | :param int sch_id: |
||
| 206 | |||
| 207 | :rtype: enarksh.Controller.Schedule.Schedule |
||
| 208 | """ |
||
| 209 | self._unload_schedule(sch_id) |
||
| 210 | |||
| 211 | return self._load_schedule(sch_id) |
||
| 212 | |||
| 213 | # ------------------------------------------------------------------------------------------------------------------ |
||
| 214 | @staticmethod |
||
| 215 | def queue_compare(schedule1, schedule2): |
||
| 216 | """ |
||
| 217 | Compares two schedules for sorting queued nodes. |
||
| 218 | |||
| 219 | :param enarksh.Controller.Schedule.Schedule schedule1: |
||
| 220 | :param enarksh.Controller.Schedule.Schedule schedule2: |
||
| 221 | |||
| 222 | :rtype: |
||
| 223 | """ |
||
| 224 | return -(schedule1.get_schedule_load() - schedule2.get_schedule_load) |
||
| 225 | |||
| 226 | # ------------------------------------------------------------------------------------------------------------------ |
||
| 227 | def _queue_handler(self): |
||
| 228 | """ |
||
| 229 | Starts node that are queued and for which there are enough resources available. |
||
| 230 | """ |
||
| 231 | # Return immediately if there are no loaded schedules. |
||
| 232 | if not self._schedules: |
||
| 233 | return |
||
| 234 | |||
| 235 | DataLayer.connect() |
||
| 236 | DataLayer.start_transaction() |
||
| 237 | |||
| 238 | while True: |
||
| 239 | start = False |
||
| 240 | if self._schedules: |
||
| 241 | schedules = sorted(self._schedules, key=functools.cmp_to_key(Controller.queue_compare)) |
||
| 242 | schedule = self._schedules[schedules[0]] |
||
| 243 | queue = schedule.get_queue() |
||
| 244 | for node in queue: |
||
| 245 | # Inquire if there are enough resources available for the node. |
||
| 246 | start = node.inquire_resources() |
||
| 247 | if start: |
||
| 248 | span_job = node.start() |
||
| 249 | |||
| 250 | # If required send a message to the spanner. |
||
| 251 | if span_job: |
||
| 252 | message = node.get_start_message() |
||
| 253 | message['sch_id'] = schedule.get_sch_id() |
||
| 254 | self._zmq_spanner.send_json(message) |
||
| 255 | |||
| 256 | else: |
||
| 257 | node.stop(0) |
||
| 258 | |||
| 259 | # If a node has been started leave inner loop. |
||
| 260 | break |
||
| 261 | |||
| 262 | # If no node has been started leave the outer loop. |
||
| 263 | if not start: |
||
| 264 | break |
||
| 265 | |||
| 266 | # Commit the last transaction (if any) and close the connection to the database. |
||
| 267 | DataLayer.commit() |
||
| 268 | DataLayer.disconnect() |
||
| 269 | |||
| 270 | # ------------------------------------------------------------------------------------------------------------------ |
||
| 271 | def _get_schedule_by_sch_id(self, sch_id): |
||
| 272 | """ |
||
| 273 | Returns a schedule. |
||
| 274 | |||
| 275 | :param int sch_id: The ID of the schedule. |
||
| 276 | |||
| 277 | :rtype: enarksh.Controller.Schedule.Schedule |
||
| 278 | """ |
||
| 279 | schedule = self._schedules.get(int(sch_id), None) |
||
| 280 | if not schedule: |
||
| 281 | # Load the schedule if the schedule is not currently loaded. |
||
| 282 | schedule = self._load_schedule(sch_id) |
||
| 283 | |||
| 284 | return schedule |
||
| 285 | |||
| 286 | # ------------------------------------------------------------------------------------------------------------------ |
||
| 287 | def _get_possible_node_actions(self, sch_id, rnd_id): |
||
| 288 | """ |
||
| 289 | Returns the possible actions for a node. |
||
| 290 | |||
| 291 | :param int sch_id: The ID of the schedule of the node. |
||
| 292 | :param int rnd_id: The ID of the node. |
||
| 293 | |||
| 294 | :rtype dict[str, bool|dict[int, dict[str, mixed]]]: Dictionary with possible node actions. |
||
| 295 | """ |
||
| 296 | message = {'actions': {enarksh.ENK_ACT_ID_TRIGGER: {'act_id': enarksh.ENK_ACT_ID_TRIGGER, |
||
| 297 | 'act_title': 'Trigger', |
||
| 298 | 'act_enabled': False}, |
||
| 299 | enarksh.ENK_ACT_ID_RESTART: {'act_id': enarksh.ENK_ACT_ID_RESTART, |
||
| 300 | 'act_title': 'Restart', |
||
| 301 | 'act_enabled': False}, |
||
| 302 | enarksh.ENK_ACT_ID_RESTART_FAILED: {'act_id': enarksh.ENK_ACT_ID_RESTART_FAILED, |
||
| 303 | 'act_title': 'Restart Failed', |
||
| 304 | 'act_enabled': False}}, |
||
| 305 | 'mail_on_completion': False, |
||
| 306 | 'mail_on_error': False} |
||
| 307 | |||
| 308 | # Find the schedule of the node. |
||
| 309 | schedule = self._get_schedule_by_sch_id(sch_id) |
||
| 310 | if not schedule: |
||
| 311 | # Node rnd_id is not part of a current run of a current schedule revision. |
||
| 312 | return message |
||
| 313 | |||
| 314 | return schedule.request_possible_node_actions(rnd_id, message) |
||
| 315 | |||
| 316 | # ------------------------------------------------------------------------------------------------------------------ |
||
| 317 | def _message_handler_request_possible_node_actions(self, message): |
||
| 318 | """ |
||
| 319 | Handles a request (from the web interface) for possible actions of a certain node. |
||
| 320 | |||
| 321 | :param dict message: The message of the request. |
||
| 322 | """ |
||
| 323 | sch_id = int(message['sch_id']) |
||
| 324 | rnd_id = int(message['rnd_id']) |
||
| 325 | |||
| 326 | response = self._get_possible_node_actions(sch_id, rnd_id) |
||
| 327 | |||
| 328 | # Send the message to the web interface. |
||
| 329 | self._zmq_lockstep_socket.send_json(response) |
||
| 330 | |||
| 331 | # ------------------------------------------------------------------------------------------------------------------ |
||
| 332 | def _message_handler_schedule_definition(self, message): |
||
| 333 | """ |
||
| 334 | Handles a request (from the operator) for loading a new schedule. |
||
| 335 | |||
| 336 | :param dict message: The message of the request. |
||
| 337 | """ |
||
| 338 | try: |
||
| 339 | # Validate XML against XSD. |
||
| 340 | reader = XmlReader() |
||
| 341 | schedule = reader.parse_schedule(message['xml'], message['filename']) |
||
| 342 | |||
| 343 | # Test schedule is currently running. |
||
| 344 | name = schedule.name |
||
| 345 | if name in self._schedules: |
||
| 346 | raise Exception("Schedule '%s' is currently running." % name) |
||
| 347 | |||
| 348 | # Insert the XML definition as BLOB in tot the database. |
||
| 349 | blb_id = DataLayer.enk_blob_insert_blob(os.path.basename(message['filename']), 'text/xml', message['xml']) |
||
| 350 | srv_id = DataLayer.enk_reader_schedule_create_revision(blb_id, name) |
||
| 351 | if not srv_id: |
||
| 352 | raise Exception("Schedule '%s' is already loaded." % name) |
||
| 353 | |||
| 354 | # Store the new schedule definition into the database. |
||
| 355 | schedule.store(srv_id, 1) |
||
| 356 | DataLayer.enk_back_schedule_revision_create_run(srv_id) |
||
| 357 | |||
| 358 | response = {'ret': 0, |
||
| 359 | 'message': "Schedule '%s' successfully loaded." % name} |
||
| 360 | except Exception as exception: |
||
| 361 | print(exception, file=sys.stderr) |
||
| 362 | response = {'ret': -1, |
||
| 363 | 'message': str(exception)} |
||
| 364 | traceback.print_exc(file=sys.stderr) |
||
| 365 | |||
| 366 | self._zmq_lockstep_socket.send_json(response) |
||
| 367 | |||
| 368 | # ------------------------------------------------------------------------------------------------------------------ |
||
| 369 | def _message_handler_dynamic_worker_definition(self, message): |
||
| 370 | """ |
||
| 371 | Handles a request for loading a dynamic worker definition. |
||
| 372 | |||
| 373 | :param dict message: The message of the request. |
||
| 374 | """ |
||
| 375 | try: |
||
| 376 | sch_id = int(message['sch_id']) |
||
| 377 | rnd_id = int(message['rnd_id']) |
||
| 378 | |||
| 379 | print('rnd_id: ' + str(rnd_id)) |
||
| 380 | |||
| 381 | # Get info of the dynamic node. |
||
| 382 | info = DataLayer.enk_back_run_node_get_dynamic_info_by_generator(rnd_id) |
||
| 383 | |||
| 384 | schedule = self._get_schedule_by_sch_id(sch_id) |
||
| 385 | parent = FakeParent(schedule, |
||
| 386 | self._host_resources, |
||
| 387 | info['nod_id_outer_worker'], |
||
| 388 | info['rnd_id_outer_worker']) |
||
| 389 | |||
| 390 | # Validate XML against XSD. |
||
| 391 | reader = XmlReader() |
||
| 392 | inner_worker = reader.parse_dynamic_worker(message['definition'], parent) |
||
| 393 | name = inner_worker.name |
||
| 394 | |||
| 395 | # Note: Dynamic node is the parent of the worker node which is the parent of the inner worker node. |
||
| 396 | inner_worker.set_levels(info['nod_recursion_level']+2) |
||
| 397 | |||
| 398 | # Store the dynamically defined inner worker node. |
||
| 399 | inner_worker.store(info['srv_id'], 0) |
||
| 400 | |||
| 401 | # Create dependencies between the input and output port of the worker node and its child node(s). |
||
| 402 | DataLayer.enk_back_node_dynamic_add_dependencies(info['nod_id_outer_worker'], inner_worker.nod_id) |
||
| 403 | |||
| 404 | # XXX trigger reload of front end |
||
| 405 | |||
| 406 | # Unload the schedule to force a reload of the schedule with new nodes added. |
||
| 407 | self._unload_schedule(sch_id) |
||
| 408 | |||
| 409 | response = {'ret': 0, |
||
| 410 | 'message': "Worker '%s' successfully loaded." % name} |
||
| 411 | |||
| 412 | except Exception as exception: |
||
| 413 | print(exception, file=sys.stderr) |
||
| 414 | response = {'ret': -1, |
||
| 415 | 'message': str(exception)} |
||
| 416 | traceback.print_exc(file=sys.stderr) |
||
| 417 | |||
| 418 | self._zmq_lockstep_socket.send_json(response) |
||
| 419 | |||
| 420 | # ------------------------------------------------------------------------------------------------------------------ |
||
| 421 | def _message_handler_request_node_action(self, message): |
||
| 422 | """ |
||
| 423 | Executes a node action request. |
||
| 424 | |||
| 425 | :param dict message: dict The message of the request. |
||
| 426 | """ |
||
| 427 | # Compose a response message for the web interface. |
||
| 428 | response = {'ret': 0, |
||
| 429 | 'new_run': 0, |
||
| 430 | 'message': 'OK'} |
||
| 431 | |||
| 432 | try: |
||
| 433 | sch_id = int(message['sch_id']) |
||
| 434 | rnd_id = int(message['rnd_id']) |
||
| 435 | act_id = int(message['act_id']) |
||
| 436 | |||
| 437 | actions = self._get_possible_node_actions(sch_id, rnd_id) |
||
| 438 | if act_id not in actions['actions'] or not actions['actions'][act_id]['act_enabled']: |
||
| 439 | response['ret'] = -1 |
||
| 440 | response['message'] = 'Not a valid action' |
||
| 441 | else: |
||
| 442 | schedule = self._get_schedule_by_sch_id(sch_id) |
||
| 443 | reload = schedule.request_node_action(rnd_id, |
||
| 444 | act_id, |
||
| 445 | message['usr_login'], |
||
| 446 | (message['mail_on_completion']), |
||
| 447 | (message['mail_on_error'])) |
||
| 448 | if reload: |
||
| 449 | # Schedule must be reloaded. |
||
| 450 | schedule = self._reload_schedule(schedule.sch_id) |
||
| 451 | # A reload is only required when the schedule is been triggered. However, this trigger is lost by |
||
| 452 | # reloading the schedule. So, resend the trigger. |
||
| 453 | schedule.request_node_action(schedule.get_activate_node().get_rnd_id(), |
||
| 454 | act_id, |
||
| 455 | message['usr_login'], |
||
| 456 | (message['mail_on_completion']), |
||
| 457 | (message['mail_on_error'])) |
||
| 458 | |||
| 459 | if act_id == enarksh.ENK_ACT_ID_TRIGGER: |
||
| 460 | response['new_run'] = 1 |
||
| 461 | |||
| 462 | except Exception as exception: |
||
| 463 | print(exception, file=sys.stderr) |
||
| 464 | traceback.print_exc(file=sys.stderr) |
||
| 465 | |||
| 466 | response['ret'] = -1 |
||
| 467 | response['message'] = 'Internal error' |
||
| 468 | |||
| 469 | # Send the message to the web interface. |
||
| 470 | self._zmq_lockstep_socket.send_json(response) |
||
| 471 | |||
| 472 | # ------------------------------------------------------------------------------------------------------------------ |
||
| 473 | def _message_handler_node_stop(self, message): |
||
| 474 | """ |
||
| 475 | Handles a message sent by the spanner after a job has finished. |
||
| 476 | |||
| 477 | :param dict message: |
||
| 478 | """ |
||
| 479 | schedule = self._get_schedule_by_sch_id(message['sch_id']) |
||
| 480 | schedule.event_node_stop(message['rnd_id'], message['exit_status']) |
||
| 481 | |||
| 482 | # ------------------------------------------------------------------------------------------------------------------ |
||
| 483 | def _message_handler(self): |
||
| 484 | """ |
||
| 485 | Waits for a new messages and processes these messages. |
||
| 486 | """ |
||
| 487 | poller = zmq.Poller() |
||
| 488 | poller.register(self._zmq_pull_socket, zmq.POLLIN) |
||
| 489 | poller.register(self._zmq_lockstep_socket, zmq.POLLIN) |
||
| 490 | |||
| 491 | socks = dict(poller.poll()) |
||
| 492 | |||
| 493 | DataLayer.connect() |
||
| 494 | DataLayer.start_transaction() |
||
| 495 | |||
| 496 | if self._zmq_pull_socket in socks: |
||
| 497 | message = self._zmq_pull_socket.recv_json() |
||
| 498 | |||
| 499 | if message['type'] == 'node_stop': |
||
| 500 | self._message_handler_node_stop(message) |
||
| 501 | |||
| 502 | else: |
||
| 503 | raise Exception("Unknown message type '%s'." % message['type']) |
||
| 504 | |||
| 505 | if self._zmq_lockstep_socket in socks: |
||
| 506 | message = self._zmq_lockstep_socket.recv_json() |
||
| 507 | |||
| 508 | if message['type'] == 'schedule_definition': |
||
| 509 | self._message_handler_schedule_definition(message) |
||
| 510 | |||
| 511 | elif message['type'] == 'request_node_action': |
||
| 512 | self._message_handler_request_node_action(message) |
||
| 513 | |||
| 514 | elif message['type'] == 'request_possible_node_actions': |
||
| 515 | self._message_handler_request_possible_node_actions(message) |
||
| 516 | |||
| 517 | elif message['type'] == 'dynamic_worker_definition': |
||
| 518 | self._message_handler_dynamic_worker_definition(message) |
||
| 519 | |||
| 520 | else: |
||
| 521 | raise Exception("Unknown message type '%s'." % message['type']) |
||
| 522 | |||
| 523 | DataLayer.commit() |
||
| 524 | DataLayer.disconnect() |
||
| 525 | |||
| 526 | # ------------------------------------------------------------------------------------------------------------------ |
||
| 527 | def main(self): |
||
| 528 | """ |
||
| 529 | The main function of the job controller. |
||
| 530 | """ |
||
| 531 | # Perform the necessary actions for starting the controller. |
||
| 532 | self._startup() |
||
| 533 | |||
| 534 | while True: |
||
| 535 | try: |
||
| 536 | # Wait for new messages and process these messages. |
||
| 537 | self._message_handler() |
||
| 538 | |||
| 539 | # Try to start nodes that are queued. |
||
| 540 | self._queue_handler() |
||
| 541 | |||
| 542 | except Exception as exception1: |
||
| 543 | try: |
||
| 544 | DataLayer.rollback() |
||
| 545 | DataLayer.disconnect() |
||
| 546 | except Exception as exception2: |
||
| 547 | print(exception2, file=sys.stderr) |
||
| 548 | traceback.print_exc(file=sys.stderr) |
||
| 549 | print(exception1, file=sys.stderr) |
||
| 550 | traceback.print_exc(file=sys.stderr) |
||
| 551 | |||
| 554 |
The coding style of this project requires that you add a docstring to this code element. Below, you find an example for methods:
If you would like to know more about docstrings, we recommend to read PEP-257: Docstring Conventions.