|
@@ 242-267 (lines=26) @@
|
| 239 |
|
self._stop_heartbeat() |
| 240 |
|
status = getattr(e, 'STATUS', 'INTERRUPTED') |
| 241 |
|
self._emit_interrupted(status) |
| 242 |
|
raise |
| 243 |
|
except: |
| 244 |
|
exc_type, exc_value, trace = sys.exc_info() |
| 245 |
|
self._stop_heartbeat() |
| 246 |
|
self._emit_failed(exc_type, exc_value, trace.tb_next) |
| 247 |
|
raise |
| 248 |
|
finally: |
| 249 |
|
self._warn_about_failed_observers() |
| 250 |
|
|
| 251 |
|
return self.result |
| 252 |
|
|
| 253 |
|
def _get_captured_output(self): |
| 254 |
|
if self._output_file.closed: |
| 255 |
|
return # nothing we can do |
| 256 |
|
flush() |
| 257 |
|
self._output_file.flush() |
| 258 |
|
self._output_file.seek(self.captured_out_cursor) |
| 259 |
|
text = self._output_file.read() |
| 260 |
|
if isinstance(text, bytes): |
| 261 |
|
text = text.decode() |
| 262 |
|
self.captured_out_cursor += len(text) |
| 263 |
|
if self.captured_out: |
| 264 |
|
text = self.captured_out + text |
| 265 |
|
if self.captured_out_filter is not None: |
| 266 |
|
text = self.captured_out_filter(text) |
| 267 |
|
self.captured_out = text |
| 268 |
|
|
| 269 |
|
def _start_heartbeat(self): |
| 270 |
|
self._emit_heartbeat() |
|
@@ 269-293 (lines=25) @@
|
| 266 |
|
text = self.captured_out_filter(text) |
| 267 |
|
self.captured_out = text |
| 268 |
|
|
| 269 |
|
def _start_heartbeat(self): |
| 270 |
|
self._emit_heartbeat() |
| 271 |
|
if self.beat_interval > 0: |
| 272 |
|
self._heartbeat = threading.Timer(self.beat_interval, |
| 273 |
|
self._start_heartbeat) |
| 274 |
|
self._heartbeat.start() |
| 275 |
|
|
| 276 |
|
def _stop_heartbeat(self): |
| 277 |
|
if self._heartbeat is not None: |
| 278 |
|
self._heartbeat.cancel() |
| 279 |
|
self._heartbeat = None |
| 280 |
|
self._emit_heartbeat() # one final beat to flush pending changes |
| 281 |
|
|
| 282 |
|
def _emit_queued(self): |
| 283 |
|
self.status = 'QUEUED' |
| 284 |
|
queue_time = datetime.datetime.utcnow() |
| 285 |
|
self.meta_info['queue_time'] = queue_time |
| 286 |
|
command = join_paths(self.main_function.prefix, |
| 287 |
|
self.main_function.signature.name) |
| 288 |
|
self.run_logger.info("Queuing-up command '%s'", command) |
| 289 |
|
for observer in self.observers: |
| 290 |
|
if hasattr(observer, 'queued_event'): |
| 291 |
|
_id = observer.queued_event( |
| 292 |
|
ex_info=self.experiment_info, |
| 293 |
|
command=command, |
| 294 |
|
queue_time=queue_time, |
| 295 |
|
config=self.config, |
| 296 |
|
meta_info=self.meta_info, |