|
@@ 242-267 (lines=26) @@
|
| 239 |
|
|
| 240 |
|
def _get_captured_output(self): |
| 241 |
|
if self._output_file.closed: |
| 242 |
|
return # nothing we can do |
| 243 |
|
flush() |
| 244 |
|
self._output_file.flush() |
| 245 |
|
self._output_file.seek(0) |
| 246 |
|
text = self._output_file.read().decode() |
| 247 |
|
if self.captured_out_filter is not None: |
| 248 |
|
text = self.captured_out_filter(text) |
| 249 |
|
self.captured_out = text |
| 250 |
|
|
| 251 |
|
def _start_heartbeat(self): |
| 252 |
|
self._emit_heartbeat() |
| 253 |
|
if self.beat_interval > 0: |
| 254 |
|
self._heartbeat = threading.Timer(self.beat_interval, |
| 255 |
|
self._start_heartbeat) |
| 256 |
|
self._heartbeat.start() |
| 257 |
|
|
| 258 |
|
def _stop_heartbeat(self): |
| 259 |
|
if self._heartbeat is not None: |
| 260 |
|
self._heartbeat.cancel() |
| 261 |
|
self._heartbeat = None |
| 262 |
|
self._emit_heartbeat() # one final beat to flush pending changes |
| 263 |
|
|
| 264 |
|
def _emit_queued(self): |
| 265 |
|
self.status = 'QUEUED' |
| 266 |
|
queue_time = datetime.datetime.utcnow() |
| 267 |
|
self.meta_info['queue_time'] = queue_time |
| 268 |
|
command = join_paths(self.main_function.prefix, |
| 269 |
|
self.main_function.signature.name) |
| 270 |
|
self.run_logger.info("Queuing-up command '%s'", command) |
|
@@ 269-293 (lines=25) @@
|
| 266 |
|
queue_time = datetime.datetime.utcnow() |
| 267 |
|
self.meta_info['queue_time'] = queue_time |
| 268 |
|
command = join_paths(self.main_function.prefix, |
| 269 |
|
self.main_function.signature.name) |
| 270 |
|
self.run_logger.info("Queuing-up command '%s'", command) |
| 271 |
|
for observer in self.observers: |
| 272 |
|
if hasattr(observer, 'queued_event'): |
| 273 |
|
_id = observer.queued_event( |
| 274 |
|
ex_info=self.experiment_info, |
| 275 |
|
command=command, |
| 276 |
|
queue_time=queue_time, |
| 277 |
|
config=self.config, |
| 278 |
|
meta_info=self.meta_info, |
| 279 |
|
_id=self._id |
| 280 |
|
) |
| 281 |
|
if self._id is None: |
| 282 |
|
self._id = _id |
| 283 |
|
# do not catch any exceptions on startup: |
| 284 |
|
# the experiment SHOULD fail if any of the observers fails |
| 285 |
|
|
| 286 |
|
if self._id is None: |
| 287 |
|
self.run_logger.info('Queued') |
| 288 |
|
else: |
| 289 |
|
self.run_logger.info('Queued-up run with ID "{}"'.format(self._id)) |
| 290 |
|
|
| 291 |
|
def _emit_started(self): |
| 292 |
|
self.status = 'RUNNING' |
| 293 |
|
self.start_time = datetime.datetime.utcnow() |
| 294 |
|
command = join_paths(self.main_function.prefix, |
| 295 |
|
self.main_function.signature.name) |
| 296 |
|
self.run_logger.info("Running command '%s'", command) |