| Conditions | 21 |
| Total Lines | 117 |
| Code Lines | 88 |
| Lines | 0 |
| Ratio | 0 % |
| Changes | 0 | ||
Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.
For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.
Commonly applied refactorings include:
If many parameters/temporary variables are present:
Complex classes like torchio.visualization.make_video() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | from __future__ import annotations |
||
| 294 | def make_video( |
||
| 295 | image: ScalarImage, |
||
| 296 | output_path: TypePath, |
||
| 297 | seconds: float | None = None, |
||
| 298 | frame_rate: float | None = None, |
||
| 299 | direction: str = 'I', |
||
| 300 | verbosity: str = 'error', |
||
| 301 | ) -> None: |
||
| 302 | ffmpeg = get_ffmpeg() |
||
| 303 | |||
| 304 | if seconds is None and frame_rate is None: |
||
| 305 | message = 'Either seconds or frame_rate must be provided.' |
||
| 306 | raise ValueError(message) |
||
| 307 | if seconds is not None and frame_rate is not None: |
||
| 308 | message = 'Provide either seconds or frame_rate, not both.' |
||
| 309 | raise ValueError(message) |
||
| 310 | if image.num_channels > 1: |
||
| 311 | message = 'Only single-channel tensors are supported for video output for now.' |
||
| 312 | raise ValueError(message) |
||
| 313 | tmin, tmax = image.data.min(), image.data.max() |
||
| 314 | if tmin < 0 or tmax > 255: |
||
| 315 | message = ( |
||
| 316 | 'The tensor must be in the range [0, 256) for video output.' |
||
| 317 | ' The image data will be rescaled to this range.' |
||
| 318 | ) |
||
| 319 | warnings.warn(message, RuntimeWarning, stacklevel=2) |
||
| 320 | image = RescaleIntensity((0, 255))(image) |
||
| 321 | if image.data.dtype != torch.uint8: |
||
| 322 | message = ( |
||
| 323 | 'Only uint8 tensors are supported for video output. The image data' |
||
| 324 | ' will be cast to uint8.' |
||
| 325 | ) |
||
| 326 | warnings.warn(message, RuntimeWarning, stacklevel=2) |
||
| 327 | image = To(torch.uint8)(image) |
||
| 328 | |||
| 329 | # Reorient so the output looks like in typical visualization software |
||
| 330 | direction = direction.upper() |
||
| 331 | if direction == 'I': # axial top to bottom |
||
| 332 | target = 'IPL' |
||
| 333 | elif direction == 'S': # axial bottom to top |
||
| 334 | target = 'SPL' |
||
| 335 | elif direction == 'A': # coronal back to front |
||
| 336 | target = 'AIL' |
||
| 337 | elif direction == 'P': # coronal front to back |
||
| 338 | target = 'PIL' |
||
| 339 | elif direction == 'R': # sagittal left to right |
||
| 340 | target = 'RIP' |
||
| 341 | elif direction == 'L': # sagittal right to left |
||
| 342 | target = 'LIP' |
||
| 343 | else: |
||
| 344 | message = ( |
||
| 345 | 'Direction must be one of "I", "S", "P", "A", "R" or "L".' |
||
| 346 | f' Got {direction!r}.' |
||
| 347 | ) |
||
| 348 | raise ValueError(message) |
||
| 349 | image = ToOrientation(target)(image) |
||
| 350 | |||
| 351 | # Check isotropy |
||
| 352 | spacing_f, spacing_h, spacing_w = image.spacing |
||
| 353 | if spacing_h != spacing_w: |
||
| 354 | message = ( |
||
| 355 | 'The height and width spacings should be the same video output.' |
||
| 356 | f' Got {spacing_h:.2f} and {spacing_w:.2f}.' |
||
| 357 | f' Resampling both to {spacing_f:.2f}.' |
||
| 358 | ) |
||
| 359 | warnings.warn(message, RuntimeWarning, stacklevel=2) |
||
| 360 | spacing_iso = min(spacing_h, spacing_w) |
||
| 361 | target_spacing = spacing_f, spacing_iso, spacing_iso |
||
| 362 | image = Resample(target_spacing)(image) # type: ignore[assignment] |
||
| 363 | |||
| 364 | # Check that height and width are multiples of 2 for H.265 encoding |
||
| 365 | num_frames, height, width = image.spatial_shape |
||
| 366 | if height % 2 != 0 or width % 2 != 0: |
||
| 367 | message = ( |
||
| 368 | f'The height ({height}) and width ({width}) must be even.' |
||
| 369 | ' The image will be cropped to the nearest even number.' |
||
| 370 | ) |
||
| 371 | warnings.warn(message, RuntimeWarning, stacklevel=2) |
||
| 372 | image = EnsureShapeMultiple((1, 2, 2), method='crop')(image) |
||
| 373 | |||
| 374 | if seconds is not None: |
||
| 375 | frame_rate = num_frames / seconds |
||
| 376 | |||
| 377 | output_path = Path(output_path) |
||
| 378 | if output_path.suffix.lower() != '.mp4': |
||
| 379 | message = 'Only .mp4 files are supported for video output.' |
||
| 380 | raise NotImplementedError(message) |
||
| 381 | |||
| 382 | frames = image.numpy()[0] |
||
| 383 | first = frames[0] |
||
| 384 | height, width = first.shape |
||
| 385 | |||
| 386 | process = ( |
||
| 387 | ffmpeg.input( |
||
| 388 | 'pipe:', |
||
| 389 | format='rawvideo', |
||
| 390 | pix_fmt='gray', |
||
| 391 | s=f'{width}x{height}', |
||
| 392 | framerate=frame_rate, |
||
| 393 | ) |
||
| 394 | .output( |
||
| 395 | str(output_path), |
||
| 396 | vcodec='libx265', |
||
| 397 | pix_fmt='yuv420p', |
||
| 398 | loglevel=verbosity, |
||
| 399 | **{'x265-params': f'log-level={verbosity}'}, |
||
| 400 | ) |
||
| 401 | .overwrite_output() |
||
| 402 | .run_async(pipe_stdin=True) |
||
| 403 | ) |
||
| 404 | |||
| 405 | for array in frames: |
||
| 406 | buffer = array.tobytes() |
||
| 407 | process.stdin.write(buffer) |
||
| 408 | |||
| 409 | process.stdin.close() |
||
| 410 | process.wait() |
||
| 411 |