ocrd.processor.base.Processor.process_page_file()   F
last analyzed

Complexity

Conditions 14

Size

Total Lines 81
Code Lines 61

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 61
dl 0
loc 81
rs 3.6
c 0
b 0
f 0
cc 14
nop 2

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like ocrd.processor.base.Processor.process_page_file() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
Processor base class and helper functions.
3
"""
4
5
__all__ = [
6
    'Processor',
7
    'generate_processor_help',
8
    'run_cli',
9
    'run_processor'
10
]
11
12
from functools import cached_property
13
from os.path import exists, join
14
from shutil import copyfileobj
15
import json
16
import os
17
from os import getcwd
18
from pathlib import Path
19
from typing import Dict, List, Optional, Tuple, Union, get_args
20
import sys
21
import logging
22
import logging.handlers
23
import inspect
24
import tarfile
25
import io
26
from collections import defaultdict
27
from frozendict import frozendict
28
# concurrent.futures is buggy in py38,
29
# this is where the fixes came from:
30
from loky import Future, ProcessPoolExecutor
31
import multiprocessing as mp
32
from multiprocessing.pool import ThreadPool
33
34
from click import wrap_text
35
from deprecated import deprecated
36
from requests import HTTPError
37
38
from ..workspace import Workspace
39
from ..mets_server import ClientSideOcrdMets
40
from ocrd_models.ocrd_file import OcrdFileType
41
from .ocrd_page_result import OcrdPageResult
42
from ocrd_utils import (
43
    VERSION as OCRD_VERSION,
44
    MIMETYPE_PAGE,
45
    MIME_TO_EXT,
46
    config,
47
    getLogger,
48
    list_resource_candidates,
49
    pushd_popd,
50
    list_all_resources,
51
    get_processor_resource_types,
52
    resource_filename,
53
    parse_json_file_with_comments,
54
    make_file_id,
55
    deprecation_warning
56
)
57
from ocrd_validators import ParameterValidator
58
from ocrd_models.ocrd_page import (
59
    PageType,
60
    AlternativeImageType,
61
    MetadataItemType,
62
    LabelType,
63
    LabelsType,
64
    OcrdPage,
65
    to_xml,
66
)
67
from ocrd_modelfactory import page_from_file
68
from ocrd_validators.ocrd_tool_validator import OcrdToolValidator
69
70
# XXX imports must remain for backwards-compatibility
71
from .helpers import run_cli, run_processor  # pylint: disable=unused-import
72
73
74
class ResourceNotFoundError(FileNotFoundError):
75
    """
76
    An exception signifying the requested processor resource
77
    cannot be resolved.
78
    """
79
    def __init__(self, name, executable):
80
        self.name = name
81
        self.executable = executable
82
        self.message = (f"Could not find resource '{name}' for executable '{executable}'. "
83
                        f"Try 'ocrd resmgr download {executable} {name}' to download this resource.")
84
        super().__init__(self.message)
85
86
87
class NonUniqueInputFile(ValueError):
88
    """
89
    An exception signifying the specified fileGrp / pageId / mimetype
90
    selector yields multiple PAGE files, or no PAGE files but multiple images,
91
    or multiple files of that mimetype.
92
    """
93
    def __init__(self, fileGrp, pageId, mimetype):
94
        self.fileGrp = fileGrp
95
        self.pageId = pageId
96
        self.mimetype = mimetype
97
        self.message = (f"Could not determine unique input file for fileGrp {fileGrp} "
98
                        f"and pageId {pageId} under mimetype {mimetype or 'PAGE+image(s)'}")
99
        super().__init__(self.message)
100
101
102
class MissingInputFile(ValueError):
103
    """
104
    An exception signifying the specified fileGrp / pageId / mimetype
105
    selector yields no PAGE files, or no PAGE and no image files,
106
    or no files of that mimetype.
107
    """
108
    def __init__(self, fileGrp, pageId, mimetype):
109
        self.fileGrp = fileGrp
110
        self.pageId = pageId
111
        self.mimetype = mimetype
112
        self.message = (f"Could not find input file for fileGrp {fileGrp} "
113
                        f"and pageId {pageId} under mimetype {mimetype or 'PAGE+image(s)'}")
114
        super().__init__(self.message)
115
116
117
class DummyFuture:
118
    """
119
    Mimics some of `concurrent.futures.Future` but runs immediately.
120
    """
121
    def __init__(self, fn, *args, **kwargs):
122
        self.fn = fn
123
        self.args = args
124
        self.kwargs = kwargs
125
126
    def result(self):
127
        return self.fn(*self.args, **self.kwargs)
128
129
130
class DummyExecutor:
131
    """
132
    Mimics some of `concurrent.futures.ProcessPoolExecutor` but runs
133
    everything immediately in this process.
134
    """
135
    def __init__(self, initializer=None, initargs=(), **kwargs):
136
        initializer(*initargs)
137
138
    def shutdown(self, **kwargs):
139
        # allow gc to catch processor instance (unless cached)
140
        _page_worker_set_ctxt(None, None)
141
142
    def submit(self, fn, *args, **kwargs) -> DummyFuture:
143
        return DummyFuture(fn, *args, **kwargs)
144
145
146
TFuture = Union[DummyFuture, Future]
147
TExecutor = Union[DummyExecutor, ProcessPoolExecutor]
148
149
150
class Processor():
151
    """
152
    A processor is a tool that implements the uniform OCR-D
153
    `command-line interface for run-time data processing <https://ocr-d.de/en/spec/cli>`_.
154
155
    That is, it executes a single workflow step, or a combination of workflow steps,
156
    on the workspace (represented by local METS). It reads input files for all or selected
157
    physical pages of the input fileGrp(s), computes additional annotation, and writes output
158
    files for them into the output fileGrp(s). It may take a number of optional or mandatory
159
    parameters.
160
    """
161
162
    max_instances: int = -1
163
    """
164
    maximum number of cached instances (ignored if negative), to be applied on top of
165
    :py:data:`~ocrd_utils.config.OCRD_MAX_PROCESSOR_CACHE` (i.e. whatever is smaller).
166
167
    (Override this if you know how many instances fit into memory - GPU / CPU RAM - at once.)
168
    """
169
170
    max_workers: int = -1
171
    """
172
    maximum number of processor forks for page-parallel processing (ignored if negative),
173
    to be applied on top of :py:data:`~ocrd_utils.config.OCRD_MAX_PARALLEL_PAGES` (i.e.
174
    whatever is smaller).
175
176
    (Override this if you know how many pages fit into processing units - GPU shaders / CPU cores
177
    - at once, or if your class already creates threads prior to forking, e.g. during ``setup``.)
178
    """
179
180
    max_page_seconds: int = -1
181
    """
182
    maximum number of seconds may be spent processing a single page (ignored if negative),
183
    to be applied on top of :py:data:`~ocrd_utils.config.OCRD_PROCESSING_PAGE_TIMEOUT`
184
    (i.e. whatever is smaller).
185
186
    (Override this if you know how costly this processor may be, irrespective of image size
187
    or complexity of the page.)
188
    """
189
190
    @property
191
    def metadata_filename(self) -> str:
192
        """
193
        Relative location of the ``ocrd-tool.json`` file inside the package.
194
195
        Used by :py:data:`metadata_location`.
196
197
        (Override if ``ocrd-tool.json`` is not in the root of the module,
198
        e.g. ``namespace/ocrd-tool.json`` or ``data/ocrd-tool.json``).
199
        """
200
        return 'ocrd-tool.json'
201
202
    @cached_property
203
    def metadata_location(self) -> Path:
204
        """
205
        Absolute path of the ``ocrd-tool.json`` file as distributed with the package.
206
207
        Used by :py:data:`metadata_rawdict`.
208
209
        (Override if ``ocrd-tool.json`` is not distributed with the Python package.)
210
        """
211
        module = inspect.getmodule(self)
212
        module_tokens = module.__package__.split('.')
213
        # for namespace packages, we cannot just use the first token
214
        for i in range(len(module_tokens)):
215
            prefix = '.'.join(module_tokens[:i + 1])
216
            if sys.modules[prefix].__spec__.has_location:
217
                return resource_filename(prefix, self.metadata_filename)
218
        raise Exception("cannot find top-level module prefix for %s", module.__package__)
219
220
    @cached_property
221
    def metadata_rawdict(self) -> dict:
222
        """
223
        Raw (unvalidated, unexpanded) ``ocrd-tool.json`` dict contents of the package.
224
225
        Used by :py:data:`metadata`.
226
227
        (Override if ``ocrd-tool.json`` is not in a file.)
228
        """
229
        return parse_json_file_with_comments(self.metadata_location)
230
231
    @cached_property
232
    def metadata(self) -> dict:
233
        """
234
        The ``ocrd-tool.json`` dict contents of the package, according to the OCR-D
235
        `spec <https://ocr-d.de/en/spec/ocrd_tool>`_ for processor tools.
236
237
        After deserialisation, it also gets validated against the
238
        `schema <https://ocr-d.de/en/spec/ocrd_tool#definition>`_ with all defaults
239
        expanded.
240
241
        Used by :py:data:`ocrd_tool` and :py:data:`version`.
242
243
        (Override if you want to provide metadata programmatically instead of a
244
        JSON file.)
245
        """
246
        metadata = self.metadata_rawdict
247
        report = OcrdToolValidator.validate(metadata)
248
        if not report.is_valid:
249
            self.logger.error(f"The ocrd-tool.json of this processor is {'problematic' if not report.errors else 'invalid'}:\n"
250
                              f"{report.to_xml()}.\nPlease open an issue at {metadata.get('git_url', 'the website')}.")
251
        return metadata
252
253
    @cached_property
254
    def version(self) -> str:
255
        """
256
        The program version of the package.
257
        Usually the ``version`` part of :py:data:`metadata`.
258
259
        (Override if you do not want to use :py:data:`metadata` lookup
260
        mechanism.)
261
        """
262
        return self.metadata['version']
263
264
    @cached_property
265
    def executable(self) -> str:
266
        """
267
        The executable name of this processor tool. Taken from the runtime
268
        filename.
269
270
        Used by :py:data:`ocrd_tool` for lookup in :py:data:`metadata`.
271
272
        (Override if your entry-point name deviates from the ``executable``
273
        name, or the processor gets instantiated from another runtime.)
274
        """
275
        return os.path.basename(inspect.stack()[-1].filename)
276
277
    @cached_property
278
    def ocrd_tool(self) -> dict:
279
        """
280
        The ``ocrd-tool.json`` dict contents of this processor tool.
281
        Usually the :py:data:`executable` key of the ``tools`` part
282
        of :py:data:`metadata`.
283
284
        (Override if you do not want to use :py:data:`metadata` lookup
285
        mechanism.)
286
        """
287
        return self.metadata['tools'][self.executable]
288
289
    @property
290
    def parameter(self) -> Optional[dict]:
291
        """the runtime parameter dict to be used by this processor"""
292
        if hasattr(self, '_parameter'):
293
            return self._parameter
294
        return None
295
296
    @parameter.setter
297
    def parameter(self, parameter: dict) -> None:
298
        if self.parameter is not None:
299
            self.shutdown()
300
        parameterValidator = ParameterValidator(self.ocrd_tool)
301
        report = parameterValidator.validate(parameter)
302
        if not report.is_valid:
303
            raise ValueError(f'Invalid parameters:\n{report.to_xml()}')
304
        # make parameter dict read-only
305
        self._parameter = frozendict(parameter)
306
        # (re-)run setup to load models etc
307
        self.setup()
308
309
    def __init__(
310
            self,
311
            # FIXME: remove in favor of process_workspace(workspace)
312
            workspace: Optional[Workspace],
313
            ocrd_tool=None,
314
            parameter=None,
315
            input_file_grp=None,
316
            output_file_grp=None,
317
            page_id=None,
318
            version=None
319
    ):
320
        """
321
        Instantiate, but do not setup (neither for processing nor other usage).
322
        If given, do parse and validate :py:data:`.parameter`.
323
324
        Args:
325
             workspace (:py:class:`~ocrd.Workspace`): The workspace to process. \
326
                 If not ``None``, then `chdir` to that directory.
327
                 Deprecated since version 3.0: Should be ``None`` here, but then needs to be set \
328
                 before processing.
329
        Keyword Args:
330
             parameter (string): JSON of the runtime choices for ocrd-tool ``parameters``. \
331
                 Can be ``None`` even for processing, but then needs to be set before running.
332
             input_file_grp (string): comma-separated list of METS ``fileGrp`` used for input. \
333
                 Deprecated since version 3.0: Should be ``None`` here, but then needs to be set \
334
                 before processing.
335
             output_file_grp (string): comma-separated list of METS ``fileGrp`` used for output. \
336
                 Deprecated since version 3.0: Should be ``None`` here, but then needs to be set \
337
                 before processing.
338
             page_id (string): comma-separated list of METS physical ``page`` IDs to process \
339
                 (or empty for all pages). \
340
                 Deprecated since version 3.0: Should be ``None`` here, but then needs to be set \
341
                 before processing.
342
        """
343
        if ocrd_tool is not None:
344
            deprecation_warning("Passing 'ocrd_tool' as keyword argument to Processor is deprecated - "
345
                                "use or override metadata/executable/ocrd-tool properties instead")
346
            self.ocrd_tool = ocrd_tool
347
            self.executable = ocrd_tool['executable']
348
        if version is not None:
349
            deprecation_warning("Passing 'version' as keyword argument to Processor is deprecated - "
350
                                "use or override metadata/version properties instead")
351
            self.version = version
352
        if workspace is not None:
353
            deprecation_warning("Passing a workspace argument other than 'None' to Processor "
354
                                "is deprecated - pass as argument to process_workspace instead")
355
            self.workspace = workspace
356
            self.old_pwd = getcwd()
357
            os.chdir(self.workspace.directory)
358
        if input_file_grp is not None:
359
            deprecation_warning("Passing an input_file_grp kwarg other than 'None' to Processor "
360
                                "is deprecated - pass as argument to process_workspace instead")
361
            self.input_file_grp = input_file_grp
362
        if output_file_grp is not None:
363
            deprecation_warning("Passing an output_file_grp kwarg other than 'None' to Processor "
364
                                "is deprecated - pass as argument to process_workspace instead")
365
            self.output_file_grp = output_file_grp
366
        if page_id is not None:
367
            deprecation_warning("Passing a page_id kwarg other than 'None' to Processor "
368
                                "is deprecated - pass as argument to process_workspace instead")
369
            self.page_id = page_id or None
370
        self.download = config.OCRD_DOWNLOAD_INPUT
371
        #: The logger to be used by processor implementations.
372
        # `ocrd.processor.base` internals should use :py:attr:`self._base_logger`
373
        self.logger = getLogger(f'ocrd.processor.{self.__class__.__name__}')
374
        self._base_logger = getLogger('ocrd.processor.base')
375
        if parameter is not None:
376
            self.parameter = parameter
377
        # workaround for deprecated#72 (@deprecated decorator does not work for subclasses):
378
        setattr(self, 'process', deprecated(
379
            version='3.0', reason='process() should be replaced '
380
            'with process_page_pcgts() or process_page_file() or process_workspace()')(
381
                getattr(self, 'process')))
382
383
    def __del__(self):
384
        self._base_logger.debug("shutting down %s in %s", repr(self), mp.current_process().name)
385
        self.shutdown()
386
387
    def show_help(self, subcommand=None):
388
        """
389
        Print a usage description including the standard CLI and all of this processor's ocrd-tool
390
        parameters and docstrings.
391
        """
392
        print(generate_processor_help(self.ocrd_tool, processor_instance=self, subcommand=subcommand))
393
394
    def show_version(self):
395
        """
396
        Print information on this processor's version and OCR-D version.
397
        """
398
        print("Version %s, ocrd/core %s" % (self.version, OCRD_VERSION))
399
400
    def verify(self):
401
        """
402
        Verify that :py:attr:`input_file_grp` and :py:attr:`output_file_grp` fulfill the processor's requirements.
403
        """
404
        # verify input and output file groups in parameters
405
        assert self.input_file_grp is not None
406
        assert self.output_file_grp is not None
407
        input_file_grps = self.input_file_grp.split(',')
408
        output_file_grps = self.output_file_grp.split(',')
409
410
        def assert_file_grp_cardinality(grps: List[str], spec: Union[int, List[int]], msg):
411
            if isinstance(spec, int):
412
                if spec > 0:
413
                    assert len(grps) == spec, msg % (len(grps), str(spec))
414
            else:
415
                assert isinstance(spec, list)
416
                minimum = spec[0]
417
                maximum = spec[1]
418
                if minimum > 0:
419
                    assert len(grps) >= minimum, msg % (len(grps), str(spec))
420
                if maximum > 0:
421
                    assert len(grps) <= maximum, msg % (len(grps), str(spec))
422
        # FIXME: enforce unconditionally as soon as grace period for deprecation is over
423
        if 'input_file_grp_cardinality' in self.ocrd_tool:
424
            assert_file_grp_cardinality(input_file_grps, self.ocrd_tool['input_file_grp_cardinality'],
425
                                        "Unexpected number of input file groups %d vs %s")
426
        if 'output_file_grp_cardinality' in self.ocrd_tool:
427
            assert_file_grp_cardinality(output_file_grps, self.ocrd_tool['output_file_grp_cardinality'],
428
                                        "Unexpected number of output file groups %d vs %s")
429
        # verify input and output file groups in METS
430
        for input_file_grp in input_file_grps:
431
            assert input_file_grp in self.workspace.mets.file_groups, \
432
                f"input fileGrp {input_file_grp} does not exist in workspace {self.workspace}"
433
        for output_file_grp in output_file_grps:
434
            assert (output_file_grp not in self.workspace.mets.file_groups
435
                    or config.OCRD_EXISTING_OUTPUT in ['OVERWRITE', 'SKIP']
436
                    or not any(self.workspace.mets.find_files(
437
                        pageId=self.page_id, fileGrp=output_file_grp))), \
438
                    f"output fileGrp {output_file_grp} already exists in workspace {self.workspace}"
439
        # keep this for backwards compatibility:
440
        return True
441
442
    def dump_json(self):
443
        """
444
        Print :py:attr:`ocrd_tool` on stdout.
445
        """
446
        print(json.dumps(self.ocrd_tool, indent=True))
447
448
    def dump_module_dir(self):
449
        """
450
        Print :py:attr:`moduledir` on stdout.
451
        """
452
        print(self.moduledir)
453
454
    def list_resources(self):
455
        """
456
        Find all installed resource files in the search paths and print their path names.
457
        """
458
        for res in self.list_all_resources():
459
            print(res)
460
461
    def setup(self) -> None:
462
        """
463
        Prepare the processor for actual data processing,
464
        prior to changing to the workspace directory but
465
        after parsing parameters.
466
467
        (Override this to load models into memory etc.)
468
        """
469
        pass
470
471
    def shutdown(self) -> None:
472
        """
473
        Bring down the processor after data processing,
474
        after to changing back from the workspace directory but
475
        before exiting (or setting up with different parameters).
476
477
        (Override this to unload models from memory etc.)
478
        """
479
        pass
480
481
    @deprecated(version='3.0', reason='process() should be replaced '
482
                'with process_page_pcgts() or process_page_file() or process_workspace()')
483
    def process(self) -> None:
484
        """
485
        Process all files of the :py:data:`workspace`
486
        from the given :py:data:`input_file_grp`
487
        to the given :py:data:`output_file_grp`
488
        for the given :py:data:`page_id` (or all pages)
489
        under the given :py:data:`parameter`.
490
491
        (This contains the main functionality and needs to be
492
        overridden by subclasses.)
493
        """
494
        raise NotImplementedError()
495
496
    def process_workspace(self, workspace: Workspace) -> None:
497
        """
498
        Process all files of the given ``workspace``,
499
        from the given :py:data:`input_file_grp`
500
        to the given :py:data:`output_file_grp`
501
        for the given :py:data:`page_id` (or all pages)
502
        under the given :py:data:`parameter`.
503
504
        Delegates to :py:meth:`.process_workspace_submit_tasks`
505
        and :py:meth:`.process_workspace_handle_tasks`.
506
507
        (This will iterate over pages and files, calling
508
        :py:meth:`.process_page_file` and handling exceptions.
509
        It should be overridden by subclasses to handle cases
510
        like post-processing or computation across pages.)
511
        """
512
        with pushd_popd(workspace.directory):
513
            self.workspace = workspace
514
            self.verify()
515
            try:
516
                # set up multitasking
517
                max_workers = max(0, config.OCRD_MAX_PARALLEL_PAGES)
518
                if self.max_workers > 0 and self.max_workers < config.OCRD_MAX_PARALLEL_PAGES:
519
                    self._base_logger.info("limiting number of workers from %d to %d", max_workers, self.max_workers)
520
                    max_workers = self.max_workers
521
                if max_workers > 1:
522
                    assert isinstance(workspace.mets, ClientSideOcrdMets), \
523
                        "OCRD_MAX_PARALLEL_PAGES>1 requires also using --mets-server-url"
524
                max_seconds = max(0, config.OCRD_PROCESSING_PAGE_TIMEOUT)
525
                if self.max_page_seconds > 0 and self.max_page_seconds < config.OCRD_PROCESSING_PAGE_TIMEOUT:
526
                    self._base_logger.info("limiting page timeout from %d to %d sec", max_seconds, self.max_page_seconds)
527
                    max_seconds = self.max_page_seconds
528
529
                if max_workers > 1:
530
                    executor_cls = ProcessPoolExecutor
531
                    log_queue = mp.get_context('fork').Queue()
532
                else:
533
                    executor_cls = DummyExecutor
534
                    log_queue = None
535
                executor = executor_cls(
536
                    max_workers=max_workers or 1,
537
                    # only forking method avoids pickling
538
                    context=mp.get_context('fork'),
539
                    # share processor instance as global to avoid pickling
540
                    initializer=_page_worker_set_ctxt,
541
                    initargs=(self, log_queue),
542
                )
543
                if max_workers > 1:
544
                    # forward messages from log queue (in subprocesses) to all root handlers
545
                    log_listener = logging.handlers.QueueListener(log_queue, *logging.root.handlers,
546
                                                                  respect_handler_level=True)
547
                    log_listener.start()
548
                tasks = None
549
                try:
550
                    self._base_logger.debug("started executor %s with %d workers", str(executor), max_workers or 1)
551
                    tasks = self.process_workspace_submit_tasks(executor, max_seconds)
552
                    stats = self.process_workspace_handle_tasks(tasks)
553
                finally:
554
                    executor.shutdown(kill_workers=True, wait=False)
555
                    self._base_logger.debug("stopped executor %s after %d tasks", str(executor), len(tasks) if tasks else -1)
556
                    if max_workers > 1:
557
                        # can cause deadlock:
558
                        #log_listener.stop()
559
                        # not much better:
560
                        #log_listener.enqueue_sentinel()
561
                        pass
562
563
            except NotImplementedError:
564
                # fall back to deprecated method
565
                try:
566
                    self.process()
567
                except Exception as err:
568
                    # suppress the NotImplementedError context
569
                    raise err from None
570
571
    def process_workspace_submit_tasks(self, executor: TExecutor, max_seconds: int) -> Dict[
572
            TFuture, Tuple[str, List[Optional[OcrdFileType]]]]:
573
        """
574
        Look up all input files of the given ``workspace``
575
        from the given :py:data:`input_file_grp`
576
        for the given :py:data:`page_id` (or all pages),
577
        and schedules calling :py:meth:`.process_page_file`
578
        on them for each page via `executor` (enforcing
579
        a per-page time limit of `max_seconds`).
580
581
        When running with `OCRD_MAX_PARALLEL_PAGES>1` and
582
        the workspace via METS Server, the executor will fork
583
        this many worker parallel subprocesses each processing
584
        one page at a time. (Interprocess communication is
585
        done via task and result queues.)
586
587
        Otherwise, tasks are run sequentially in the
588
        current process.
589
590
        Delegates to :py:meth:`.zip_input_files` to get
591
        the input files for each page, and then calls
592
        :py:meth:`.process_workspace_submit_page_task`.
593
594
        Returns a dict mapping the per-page tasks
595
        (i.e. futures submitted to the executor)
596
        to their corresponding pageId and input files.
597
        """
598
        tasks = {}
599
        for input_file_tuple in self.zip_input_files(on_error='abort', require_first=False):
600
            task, page_id, input_files = self.process_workspace_submit_page_task(executor, max_seconds, input_file_tuple)
601
            tasks[task] = (page_id, input_files)
602
        self._base_logger.debug("submitted %d processing tasks", len(tasks))
603
        return tasks
604
605
    def process_workspace_submit_page_task(self, executor: TExecutor, max_seconds: int,
606
                                           input_file_tuple: List[Optional[OcrdFileType]]) -> Tuple[
607
                                               TFuture, str, List[Optional[OcrdFileType]]]:
608
        """
609
        Ensure all input files for a single page are
610
        downloaded to the workspace, then schedule
611
        :py:meth:`.process_process_file` to be run on
612
        them via `executor` (enforcing a per-page time
613
        limit of `max_seconds`).
614
615
        Delegates to :py:meth:`.process_page_file`
616
        (wrapped in :py:func:`_page_worker` to share
617
        the processor instance across forked processes).
618
619
        \b
620
        Returns a tuple of:
621
        - the scheduled future object,
622
        - the corresponding pageId,
623
        - the corresponding input files.
624
        """
625
        input_files: List[Optional[OcrdFileType]] = [None] * len(input_file_tuple)
626
        page_id = next(input_file.pageId
627
                       for input_file in input_file_tuple
628
                       if input_file)
629
        self._base_logger.info(f"preparing page {page_id}")
630
        for i, input_file in enumerate(input_file_tuple):
631
            if input_file is None:
632
                # file/page not found in this file grp
633
                continue
634
            input_files[i] = input_file
635
            if not self.download:
636
                continue
637
            try:
638
                input_files[i] = self.workspace.download_file(input_file)
639
            except (ValueError, FileNotFoundError, HTTPError) as e:
640
                self._base_logger.error(repr(e))
641
                self._base_logger.warning(f"failed downloading file {input_file} for page {page_id}")
642
        # process page
643
        #executor.submit(self.process_page_file, *input_files)
644
        return executor.submit(_page_worker, max_seconds, *input_files), page_id, input_files
645
646
    def process_workspace_handle_tasks(self, tasks: Dict[TFuture, Tuple[str, List[Optional[OcrdFileType]]]]) -> Tuple[
647
            int, int, Dict[str, int], int]:
648
        """
649
        Look up scheduled per-page futures one by one,
650
        handle errors (exceptions) and gather results.
651
652
        \b
653
        Enforces policies configured by the following
654
        environment variables:
655
        - `OCRD_EXISTING_OUTPUT` (abort/skip/overwrite)
656
        - `OCRD_MISSING_OUTPUT` (abort/skip/fallback-copy)
657
        - `OCRD_MAX_MISSING_OUTPUTS` (abort after all).
658
659
        \b
660
        Returns a tuple of:
661
        - the number of successfully processed pages
662
        - the number of failed (i.e. skipped or copied) pages
663
        - a dict of the type and corresponding number of exceptions seen
664
        - the number of total requested pages (i.e. success+fail+existing).
665
666
        Delegates to :py:meth:`.process_workspace_handle_page_task`
667
        for each page.
668
        """
669
        # aggregate info for logging:
670
        nr_succeeded = 0
671
        nr_failed = 0
672
        nr_errors = defaultdict(int)  # count causes
673
        if config.OCRD_MISSING_OUTPUT == 'SKIP':
674
            reason = "skipped"
675
        elif config.OCRD_MISSING_OUTPUT == 'COPY':
676
            reason = "fallback-copied"
677
        for task in tasks:
678
            # wait for results, handle errors
679
            page_id, input_files = tasks[task]
680
            result = self.process_workspace_handle_page_task(page_id, input_files, task)
681
            if isinstance(result, Exception):
682
                nr_errors[result.__class__.__name__] += 1
683
                nr_failed += 1
684
                # FIXME: this is just prospective, because len(tasks)==nr_failed+nr_succeeded is not guaranteed
685
                if config.OCRD_MAX_MISSING_OUTPUTS > 0 and nr_failed / len(tasks) > config.OCRD_MAX_MISSING_OUTPUTS:
686
                    # already irredeemably many failures, stop short
687
                    nr_errors = dict(nr_errors)
688
                    raise Exception(f"too many failures with {reason} output ({nr_failed} of {nr_failed+nr_succeeded}, "
689
                                    f"{str(nr_errors)})")
690
            elif result:
691
                nr_succeeded += 1
692
            # else skipped - already exists
693
        nr_errors = dict(nr_errors)
694
        nr_all = nr_succeeded + nr_failed
695
        if nr_failed > 0:
696
            if config.OCRD_MAX_MISSING_OUTPUTS > 0 and nr_failed / nr_all > config.OCRD_MAX_MISSING_OUTPUTS:
697
                raise Exception(f"too many failures with {reason} output ({nr_failed} of {nr_all}, {str(nr_errors)})")
698
            self._base_logger.warning("%s %d of %d pages due to %s", reason, nr_failed, nr_all, str(nr_errors))
699
        self._base_logger.debug("succeeded %d, missed %d of %d pages due to %s",
700
                                nr_succeeded, nr_failed, nr_all, str(nr_errors))
701
        return nr_succeeded, nr_failed, nr_errors, len(tasks)
702
703
    def process_workspace_handle_page_task(self, page_id: str, input_files: List[Optional[OcrdFileType]],
704
                                           task: TFuture) -> Union[bool, Exception]:
705
        """
706
        \b
707
        Await a single page result and handle errors (exceptions),
708
        enforcing policies configured by the following
709
        environment variables:
710
        - `OCRD_EXISTING_OUTPUT` (abort/skip/overwrite)
711
        - `OCRD_MISSING_OUTPUT` (abort/skip/fallback-copy)
712
        - `OCRD_MAX_MISSING_OUTPUTS` (abort after all).
713
714
        \b
715
        Returns
716
        - true in case of success
717
        - false in case the output already exists
718
        - the exception in case of failure
719
        """
720
        # FIXME: differentiate error cases in various ways:
721
        # - ResourceNotFoundError → use ResourceManager to download (once), then retry
722
        # - transient (I/O or OOM) error → maybe sleep, retry
723
        # - persistent (data) error → skip / dummy / raise
724
        try:
725
            self._base_logger.debug("waiting for output of task %s (page %s)", task, page_id)
726
            # timeout kwarg on future is useless: it only raises TimeoutError here,
727
            # but does not stop the running process/thread, and executor itself
728
            # offers nothing to that effect:
729
            # task.result(timeout=max_seconds or None)
730
            # so we instead applied the timeout within the worker function
731
            task.result()
732
            return True
733
        except NotImplementedError:
734
            # exclude NotImplementedError, so we can try process() below
735
            raise
736
        # handle input failures separately
737
        except FileExistsError as err:
738
            if config.OCRD_EXISTING_OUTPUT == 'ABORT':
739
                raise err
740
            if config.OCRD_EXISTING_OUTPUT == 'SKIP':
741
                return False
742
            if config.OCRD_EXISTING_OUTPUT == 'OVERWRITE':
743
                # too late here, must not happen
744
                raise Exception(f"got {err} despite OCRD_EXISTING_OUTPUT==OVERWRITE")
745
        except KeyboardInterrupt:
746
            raise
747
        # broad coverage of output failures (including TimeoutError)
748
        except Exception as err:
749
            # FIXME: add re-usable/actionable logging
750
            if config.OCRD_MISSING_OUTPUT == 'ABORT':
751
                self._base_logger.error(f"Failure on page {page_id}: {str(err) or err.__class__.__name__}")
752
                raise err
753
            self._base_logger.exception(f"Failure on page {page_id}: {str(err) or err.__class__.__name__}")
754
            if config.OCRD_MISSING_OUTPUT == 'SKIP':
755
                pass
756
            elif config.OCRD_MISSING_OUTPUT == 'COPY':
757
                self._copy_page_file(input_files[0])
758
            else:
759
                desc = config.describe('OCRD_MISSING_OUTPUT', wrap_text=False, indent_text=False)
760
                raise ValueError(f"unknown configuration value {config.OCRD_MISSING_OUTPUT} - {desc}")
761
            return err
762
763
    def _copy_page_file(self, input_file: OcrdFileType) -> None:
764
        """
765
        Copy the given ``input_file`` of the :py:data:`workspace`,
766
        representing one physical page (passed as one opened
767
        :py:class:`~ocrd_models.OcrdFile` per input fileGrp)
768
        and add it as if it was a processing result.
769
        """
770
        input_pcgts: OcrdPage
771
        assert isinstance(input_file, get_args(OcrdFileType))
772
        self._base_logger.debug(f"parsing file {input_file.ID} for page {input_file.pageId}")
773
        try:
774
            input_pcgts = page_from_file(input_file)
775
        except ValueError as err:
776
            # not PAGE and not an image to generate PAGE for
777
            self._base_logger.error(f"non-PAGE input for page {input_file.pageId}: {err}")
778
            return
779
        output_file_id = make_file_id(input_file, self.output_file_grp)
780
        input_pcgts.set_pcGtsId(output_file_id)
781
        self.add_metadata(input_pcgts)
782
        self.workspace.add_file(
783
            file_id=output_file_id,
784
            file_grp=self.output_file_grp,
785
            page_id=input_file.pageId,
786
            local_filename=os.path.join(self.output_file_grp, output_file_id + '.xml'),
787
            mimetype=MIMETYPE_PAGE,
788
            content=to_xml(input_pcgts),
789
        )
790
791
    def process_page_file(self, *input_files: Optional[OcrdFileType]) -> None:
792
        """
793
        Process the given ``input_files`` of the :py:data:`workspace`,
794
        representing one physical page (passed as one opened
795
        :py:class:`.OcrdFile` per input fileGrp)
796
        under the given :py:data:`.parameter`, and make sure the
797
        results get added accordingly.
798
799
        (This uses :py:meth:`.process_page_pcgts`, but should be overridden by subclasses
800
        to handle cases like multiple output fileGrps, non-PAGE input etc.)
801
        """
802
        input_pcgts: List[Optional[OcrdPage]] = [None] * len(input_files)
803
        input_pos = next(i for i, input_file in enumerate(input_files) if input_file is not None)
804
        page_id = input_files[input_pos].pageId
805
        self._base_logger.info("processing page %s", page_id)
806
        for i, input_file in enumerate(input_files):
807
            grp = self.input_file_grp.split(',')[i]
808
            if input_file is None:
809
                self._base_logger.debug(f"ignoring missing file for input fileGrp {grp} for page {page_id}")
810
                continue
811
            assert isinstance(input_file, get_args(OcrdFileType))
812
            if not input_file.local_filename:
813
                self._base_logger.error(f'No local file exists for page {page_id} in file group {grp}')
814
                if config.OCRD_MISSING_INPUT == 'ABORT':
815
                    raise MissingInputFile(grp, page_id, input_file.mimetype)
816
                continue
817
            self._base_logger.debug(f"parsing file {input_file.ID} for page {page_id}")
818
            try:
819
                page_ = page_from_file(input_file)
820
                assert isinstance(page_, OcrdPage)
821
                input_pcgts[i] = page_
822
            except ValueError as err:
823
                # not PAGE and not an image to generate PAGE for
824
                self._base_logger.error(f"non-PAGE input for page {page_id}: {err}")
825
        if not any(input_pcgts):
826
            self._base_logger.warning(f'skipping page {page_id}')
827
            return
828
        output_file_id = make_file_id(input_files[input_pos], self.output_file_grp)
829
        if input_files[input_pos].fileGrp == self.output_file_grp:
830
            # input=output fileGrp: re-use ID exactly
831
            output_file_id = input_files[input_pos].ID
832
        output_file = next(self.workspace.mets.find_files(ID=output_file_id), None)
833
        if output_file and config.OCRD_EXISTING_OUTPUT != 'OVERWRITE':
834
            # short-cut avoiding useless computation:
835
            raise FileExistsError(
836
                f"A file with ID=={output_file_id} already exists {output_file} and neither force nor ignore are set"
837
            )
838
        result = self.process_page_pcgts(*input_pcgts, page_id=page_id)
839
        for image_result in result.images:
840
            image_file_id = f'{output_file_id}_{image_result.file_id_suffix}'
841
            image_file_path = join(self.output_file_grp, f'{image_file_id}.png')
842
            if isinstance(image_result.alternative_image, PageType):
843
                # special case: not an alternative image, but replacing the original image
844
                # (this is needed by certain processors when the original's coordinate system
845
                #  cannot or must not be kept)
846
                image_result.alternative_image.set_imageFilename(image_file_path)
847
                image_result.alternative_image.set_imageWidth(image_result.pil.width)
848
                image_result.alternative_image.set_imageHeight(image_result.pil.height)
849
            elif isinstance(image_result.alternative_image, AlternativeImageType):
850
                image_result.alternative_image.set_filename(image_file_path)
851
            elif image_result.alternative_image is None:
852
                pass  # do not reference in PAGE result
853
            else:
854
                raise ValueError(f"process_page_pcgts returned an OcrdPageResultImage of unknown type "
855
                                 f"{type(image_result.alternative_image)}")
856
            self.workspace.save_image_file(
857
                image_result.pil,
858
                image_file_id,
859
                self.output_file_grp,
860
                page_id=page_id,
861
                file_path=image_file_path,
862
            )
863
        result.pcgts.set_pcGtsId(output_file_id)
864
        self.add_metadata(result.pcgts)
865
        self.workspace.add_file(
866
            file_id=output_file_id,
867
            file_grp=self.output_file_grp,
868
            page_id=page_id,
869
            local_filename=os.path.join(self.output_file_grp, output_file_id + '.xml'),
870
            mimetype=MIMETYPE_PAGE,
871
            content=to_xml(result.pcgts),
872
        )
873
874
    def process_page_pcgts(self, *input_pcgts: Optional[OcrdPage], page_id: Optional[str] = None) -> OcrdPageResult:
875
        """
876
        Process the given ``input_pcgts`` of the :py:data:`.workspace`,
877
        representing one physical page (passed as one parsed
878
        :py:class:`.OcrdPage` per input fileGrp)
879
        under the given :py:data:`.parameter`, and return the
880
        resulting :py:class:`.OcrdPageResult`.
881
882
        Optionally, add to the ``images`` attribute of the resulting
883
        :py:class:`.OcrdPageResult` instances of :py:class:`.OcrdPageResultImage`,
884
        which have required fields for ``pil`` (:py:class:`PIL.Image` image data),
885
        ``file_id_suffix`` (used for generating IDs of the saved image) and
886
        ``alternative_image`` (reference of the :py:class:`ocrd_models.ocrd_page.AlternativeImageType`
887
        for setting the filename of the saved image).
888
889
        (This contains the main functionality and must be overridden by subclasses,
890
        unless it does not get called by some overriden :py:meth:`.process_page_file`.)
891
        """
892
        raise NotImplementedError()
893
894
    def add_metadata(self, pcgts: OcrdPage) -> None:
895
        """
896
        Add PAGE-XML :py:class:`~ocrd_models.ocrd_page.MetadataItemType` ``MetadataItem`` describing
897
        the processing step and runtime parameters to :py:class:`.OcrdPage` ``pcgts``.
898
        """
899
        metadata_obj = pcgts.get_Metadata()
900
        assert metadata_obj is not None
901
        metadata_item = MetadataItemType(
902
            type_="processingStep",
903
            name=self.ocrd_tool['steps'][0],
904
            value=self.ocrd_tool['executable'],
905
            Labels=[LabelsType(
906
                externalModel="ocrd-tool",
907
                externalId="parameters",
908
                Label=[LabelType(type_=name,
909
                                 value=self.parameter[name])
910
                       for name in self.parameter.keys()]),
911
                    LabelsType(
912
                        externalModel="ocrd-tool",
913
                        externalId="version",
914
                        Label=[LabelType(type_=self.ocrd_tool['executable'],
915
                                         value=self.version),
916
                               LabelType(type_='ocrd/core',
917
                                         value=OCRD_VERSION)])
918
            ])
919
        metadata_obj.add_MetadataItem(metadata_item)
920
921
    def resolve_resource(self, val):
922
        """
923
        Resolve a resource name to an absolute file path with the algorithm in
924
        `spec <https://ocr-d.de/en/spec/ocrd_tool#file-parameters>`_
925
926
        Args:
927
            val (string): resource value to resolve
928
        """
929
        executable = self.ocrd_tool['executable']
930
        if exists(val):
931
            self._base_logger.debug("Resolved to absolute path %s" % val)
932
            return val
933
        # FIXME: remove once workspace arg / old_pwd is gone:
934
        if hasattr(self, 'old_pwd'):
935
            cwd = self.old_pwd
936
        else:
937
            cwd = getcwd()
938
        ret = [cand for cand in list_resource_candidates(executable, val,
939
                                                         cwd=cwd, moduled=self.moduledir)
940
               if exists(cand)]
941
        if ret:
942
            self._base_logger.debug("Resolved %s to absolute path %s" % (val, ret[0]))
943
            return ret[0]
944
        raise ResourceNotFoundError(val, executable)
945
946
    def show_resource(self, val):
947
        """
948
        Resolve a resource name to a file path with the algorithm in
949
        `spec <https://ocr-d.de/en/spec/ocrd_tool#file-parameters>`_,
950
        then print its contents to stdout.
951
952
        Args:
953
            val (string): resource value to show
954
        """
955
        res_fname = self.resolve_resource(val)
956
        fpath = Path(res_fname)
957
        if fpath.is_dir():
958
            with pushd_popd(fpath):
959
                fileobj = io.BytesIO()
960
                with tarfile.open(fileobj=fileobj, mode='w:gz') as tarball:
961
                    tarball.add('.')
962
                fileobj.seek(0)
963
                copyfileobj(fileobj, sys.stdout.buffer)
964
        else:
965
            sys.stdout.buffer.write(fpath.read_bytes())
966
967
    def list_all_resources(self):
968
        """
969
        List all resources found in the filesystem and matching content-type by filename suffix
970
        """
971
        mimetypes = get_processor_resource_types(None, self.ocrd_tool)
972
        for res in list_all_resources(self.ocrd_tool['executable'], moduled=self.moduledir):
973
            res = Path(res)
974
            if '*/*' not in mimetypes:
975
                if res.is_dir() and 'text/directory' not in mimetypes:
976
                    continue
977
                # if we do not know all MIME types, then keep the file, otherwise require suffix match
978
                if res.is_file() and not any(res.suffix == MIME_TO_EXT.get(mime, res.suffix)
979
                                             for mime in mimetypes):
980
                    continue
981
            yield res
982
983
    @property
984
    def module(self):
985
        """
986
        The top-level module this processor belongs to.
987
        """
988
        # find shortest prefix path that is not just a namespace package
989
        fqname = ''
990
        for name in self.__module__.split('.'):
991
            if fqname:
992
                fqname += '.'
993
            fqname += name
994
            if getattr(sys.modules[fqname], '__file__', None):
995
                return fqname
996
        # fall-back
997
        return self.__module__
998
999
    @property
1000
    def moduledir(self):
1001
        """
1002
        The filesystem path of the module directory.
1003
        """
1004
        return resource_filename(self.module, '.')
1005
1006
    @property
1007
    def input_files(self):
1008
        """
1009
        List the input files (for single-valued :py:attr:`input_file_grp`).
1010
1011
        For each physical page:
1012
1013
        - If there is a single PAGE-XML for the page, take it (and forget about all
1014
          other files for that page)
1015
        - Else if there is a single image file, take it (and forget about all other
1016
          files for that page)
1017
        - Otherwise raise an error (complaining that only PAGE-XML warrants
1018
          having multiple images for a single page)
1019
1020
        See `algorithm <https://github.com/cisocrgroup/ocrd_cis/pull/57#issuecomment-656336593>`_
1021
1022
        Returns:
1023
            A list of :py:class:`ocrd_models.ocrd_file.OcrdFile` objects.
1024
        """
1025
        if not self.input_file_grp:
1026
            raise ValueError("Processor is missing input fileGrp")
1027
        ret = self.zip_input_files(mimetype=None, on_error='abort')
1028
        if not ret:
1029
            return []
1030
        assert len(ret[0]) == 1, 'Use zip_input_files() instead of input_files when processing multiple input fileGrps'
1031
        return [tuples[0] for tuples in ret]
1032
1033
    def zip_input_files(self, require_first=True, mimetype=None, on_error='skip'):
1034
        """
1035
        List tuples of input files (for multi-valued :py:attr:`input_file_grp`).
1036
1037
        Processors that expect/need multiple input file groups,
1038
        cannot use :py:data:`input_files`. They must align (zip) input files
1039
        across pages. This includes the case where not all pages
1040
        are equally present in all file groups. It also requires
1041
        making a consistent selection if there are multiple files
1042
        per page.
1043
1044
        Following the OCR-D functional model, this function tries to
1045
        find a single PAGE file per page, or fall back to a single
1046
        image file per page. In either case, multiple matches per page
1047
        are an error (see error handling below).
1048
        This default behaviour can be changed by using a fixed MIME
1049
        type filter via :py:attr:`mimetype`. But still, multiple matching
1050
        files per page are an error.
1051
1052
        Single-page multiple-file errors are handled according to
1053
        :py:attr:`on_error`:
1054
1055
        - if ``skip``, then the page for the respective fileGrp will be
1056
          silently skipped (as if there was no match at all)
1057
        - if ``first``, then the first matching file for the page will be
1058
          silently selected (as if the first was the only match)
1059
        - if ``last``, then the last matching file for the page will be
1060
          silently selected (as if the last was the only match)
1061
        - if ``abort``, then an exception will be raised.
1062
1063
        Multiple matches for PAGE-XML will always raise an exception.
1064
1065
        Keyword Args:
1066
             require_first (boolean): If true, then skip a page entirely
1067
                 whenever it is not available in the first input `fileGrp`.
1068
             on_error (string): How to handle multiple file matches per page.
1069
             mimetype (string): If not `None`, filter by the specified MIME
1070
                 type (literal or regex prefixed by `//`). Otherwise prefer
1071
                 PAGE or image.
1072
        Returns:
1073
            A list of :py:class:`ocrd_models.ocrd_file.OcrdFile` tuples.
1074
        """
1075
        if not self.input_file_grp:
1076
            raise ValueError("Processor is missing input fileGrp")
1077
1078
        ifgs = self.input_file_grp.split(",")
1079
        # Iterating over all files repeatedly may seem inefficient at first sight,
1080
        # but the unnecessary OcrdFile instantiations for posterior fileGrp filtering
1081
        # can actually be much more costly than traversing the ltree.
1082
        # This might depend on the number of pages vs number of fileGrps.
1083
1084
        pages = {}
1085
        for i, ifg in enumerate(ifgs):
1086
            files_ = sorted(self.workspace.mets.find_all_files(
1087
                    pageId=self.page_id, fileGrp=ifg, mimetype=mimetype),
1088
                                # sort by MIME type so PAGE comes before images
1089
                                key=lambda file_: file_.mimetype)
1090
            for file_ in files_:
1091
                if not file_.pageId:
1092
                    # ignore document-global files
1093
                    continue
1094
                ift = pages.setdefault(file_.pageId, [None]*len(ifgs))
1095
                if ift[i]:
1096
                    self._base_logger.debug(
1097
                        f"another file {file_.ID} for page {file_.pageId} in input file group {ifg}")
1098
                    # fileGrp has multiple files for this page ID
1099
                    if mimetype:
1100
                        # filter was active, this must not happen
1101
                        self._base_logger.warning(
1102
                            f"added file {file_.ID} for page {file_.pageId} in input file group {ifg} "
1103
                            f"conflicts with file {ift[i].ID} of same MIME type {mimetype} - on_error={on_error}")
1104 View Code Duplication
                        if on_error == 'skip':
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1105
                            ift[i] = None
1106
                        elif on_error == 'first':
1107
                            pass  # keep first match
1108
                        elif on_error == 'last':
1109
                            ift[i] = file_
1110
                        elif on_error == 'abort':
1111
                            raise NonUniqueInputFile(ifg, file_.pageId, mimetype)
1112
                        else:
1113
                            raise Exception("Unknown 'on_error' strategy '%s'" % on_error)
1114
                    elif (ift[i].mimetype == MIMETYPE_PAGE and
1115
                          file_.mimetype != MIMETYPE_PAGE):
1116
                        pass  # keep PAGE match
1117
                    elif (ift[i].mimetype == MIMETYPE_PAGE and
1118
                          file_.mimetype == MIMETYPE_PAGE):
1119
                        raise NonUniqueInputFile(ifg, file_.pageId, None)
1120
                    else:
1121
                        # filter was inactive but no PAGE is in control, this must not happen
1122
                        self._base_logger.warning(
1123
                            f"added file {file_.ID} for page {file_.pageId} in input file group {ifg} "
1124
                            f"conflicts with file {ift[i].ID} but no PAGE available - on_error={on_error}")
1125 View Code Duplication
                        if on_error == 'skip':
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1126
                            ift[i] = None
1127
                        elif on_error == 'first':
1128
                            pass  # keep first match
1129
                        elif on_error == 'last':
1130
                            ift[i] = file_
1131
                        elif on_error == 'abort':
1132
                            raise NonUniqueInputFile(ifg, file_.pageId, None)
1133
                        else:
1134
                            raise Exception("Unknown 'on_error' strategy '%s'" % on_error)
1135
                else:
1136
                    self._base_logger.debug(f"adding file {file_.ID} for page {file_.pageId} to input file group {ifg}")
1137
                    ift[i] = file_
1138
        # Warn if no files found but pageId was specified, because that might be due to invalid page_id (range)
1139
        if self.page_id and not any(pages):
1140
            self._base_logger.critical(f"Could not find any files for selected pageId {self.page_id}.\n"
1141
                                       f"compare '{self.page_id}' with the output of 'orcd workspace list-page'.")
1142
        ifts = []
1143
        # use physical page order
1144
        for page in self.workspace.mets.physical_pages:
1145
            if page not in pages:
1146
                continue
1147
            ifiles = pages[page]
1148
            for i, ifg in enumerate(ifgs):
1149
                if not ifiles[i]:
1150
                    # could be from non-unique with on_error=skip or from true gap
1151
                    self._base_logger.error(f'Found no file for page {page} in file group {ifg}')
1152
                    if config.OCRD_MISSING_INPUT == 'ABORT':
1153
                        raise MissingInputFile(ifg, page, mimetype)
1154
            if not any(ifiles):
1155
                # must be from non-unique with on_error=skip
1156
                self._base_logger.warning(f'Found no files for {page} - skipping')
1157
                continue
1158
            if ifiles[0] or not require_first:
1159
                ifts.append(tuple(ifiles))
1160
        return ifts
1161
1162
1163
_page_worker_processor = None
1164
"""
1165
This global binding for the processor is required to avoid
1166
squeezing the processor through a mp.Queue (which is impossible
1167
due to unpicklable attributes like .workspace.mets._tree anyway)
1168
when calling Processor.process_page_file as page worker processes
1169
in Processor.process_workspace. Forking allows inheriting global
1170
objects, and with the METS Server we do not mutate the local
1171
processor instance anyway.
1172
"""
1173
1174
1175
def _page_worker_set_ctxt(processor, log_queue):
1176
    """
1177
    Overwrites `ocrd.processor.base._page_worker_processor` instance
1178
    for sharing with subprocesses in ProcessPoolExecutor initializer.
1179
    """
1180
    global _page_worker_processor
1181
    _page_worker_processor = processor
1182
    if log_queue:
1183
        # replace all log handlers with just one queue handler
1184
        logging.root.handlers = [logging.handlers.QueueHandler(log_queue)]
1185
1186
1187
def _page_worker(timeout, *input_files):
1188
    """
1189
    Wraps a `Processor.process_page_file` call as payload (call target)
1190
    of the ProcessPoolExecutor workers, but also enforces the given timeout.
1191
    """
1192
    page_id = next((file.pageId for file in input_files
1193
                    if hasattr(file, 'pageId')), "")
1194
    pool = ThreadPool(processes=1)
1195
    try:
1196
        #_page_worker_processor.process_page_file(*input_files)
1197
        async_result = pool.apply_async(_page_worker_processor.process_page_file, input_files)
1198
        async_result.get(timeout or None)
1199
        _page_worker_processor.logger.debug("page worker completed for page %s", page_id)
1200
    except mp.TimeoutError:
1201
        _page_worker_processor.logger.debug("page worker timed out for page %s", page_id)
1202
        raise
1203
1204
1205
def generate_processor_help(ocrd_tool, processor_instance=None, subcommand=None):
1206
    """Generate a string describing the full CLI of this processor including params.
1207
1208
    Args:
1209
         ocrd_tool (dict): this processor's ``tools`` section of the module's ``ocrd-tool.json``
1210
         processor_instance (object, optional): the processor implementation
1211
             (for adding any module/class/function docstrings)
1212
        subcommand (string): 'worker' or 'server'
1213
    """
1214
    doc_help = ''
1215
    if processor_instance:
1216
        module = inspect.getmodule(processor_instance)
1217
        if module and module.__doc__:
1218
            doc_help += '\n' + inspect.cleandoc(module.__doc__) + '\n'
1219
        if processor_instance.__doc__:
1220
            doc_help += '\n' + inspect.cleandoc(processor_instance.__doc__) + '\n'
1221
        # Try to find the most concrete docstring among the various methods that an implementation
1222
        # could overload, first serving.
1223
        # In doing so, compare with Processor to avoid a glitch in the way py>=3.5 inherits docstrings.
1224
        # (They are supposed to only repeat information inspect.getdoc, rather than inherit __doc__ itself.)
1225
        for method in ['process_page_pcgts', 'process_page_file', 'process_workspace', 'process']:
1226
            instance_method = getattr(processor_instance, method)
1227
            superclass_method = getattr(Processor, method)
1228
            if instance_method.__doc__ and instance_method.__doc__ != superclass_method.__doc__:
1229
                doc_help += '\n' + inspect.cleandoc(instance_method.__doc__) + '\n'
1230
                break
1231
        if doc_help:
1232
            doc_help = '\n\n' + wrap_text(doc_help, width=72,
1233
                                          initial_indent='  > ',
1234
                                          subsequent_indent='  > ',
1235
                                          preserve_paragraphs=True)
1236
    subcommands = '''\
1237
    worker      Start a processing worker rather than do local processing
1238
    server      Start a processor server rather than do local processing
1239
'''
1240
1241
    processing_worker_options = '''\
1242
  --queue                         The RabbitMQ server address in format
1243
                                  "amqp://{user}:{pass}@{host}:{port}/{vhost}"
1244
                                  [amqp://admin:admin@localhost:5672]
1245
  --database                      The MongoDB server address in format
1246
                                  "mongodb://{host}:{port}"
1247
                                  [mongodb://localhost:27018]
1248
  --log-filename                  Filename to redirect STDOUT/STDERR to,
1249
                                  if specified.
1250
'''
1251
1252
    processing_server_options = '''\
1253
  --address                       The Processor server address in format
1254
                                  "{host}:{port}"
1255
  --database                      The MongoDB server address in format
1256
                                  "mongodb://{host}:{port}"
1257
                                  [mongodb://localhost:27018]
1258
'''
1259
1260
    processing_options = '''\
1261
  -m, --mets URL-PATH             URL or file path of METS to process [./mets.xml]
1262
  -w, --working-dir PATH          Working directory of local workspace [dirname(URL-PATH)]
1263
  -I, --input-file-grp USE        File group(s) used as input
1264
  -O, --output-file-grp USE       File group(s) used as output
1265
  -g, --page-id ID                Physical page ID(s) to process instead of full document []
1266
  --overwrite                     Remove existing output pages/images
1267
                                  (with "--page-id", remove only those).
1268
                                  Short-hand for OCRD_EXISTING_OUTPUT=OVERWRITE
1269
  --debug                         Abort on any errors with full stack trace.
1270
                                  Short-hand for OCRD_MISSING_OUTPUT=ABORT
1271
  --profile                       Enable profiling
1272
  --profile-file PROF-PATH        Write cProfile stats to PROF-PATH. Implies "--profile"
1273
  -p, --parameter JSON-PATH       Parameters, either verbatim JSON string
1274
                                  or JSON file path
1275
  -P, --param-override KEY VAL    Override a single JSON object key-value pair,
1276
                                  taking precedence over --parameter
1277
  -U, --mets-server-url URL       URL of a METS Server for parallel incremental access to METS
1278
                                  If URL starts with http:// start an HTTP server there,
1279
                                  otherwise URL is a path to an on-demand-created unix socket
1280
  -l, --log-level [OFF|ERROR|WARN|INFO|DEBUG|TRACE]
1281
                                  Override log level globally [INFO]
1282
  --log-filename LOG-PATH         File to redirect stderr logging to (overriding ocrd_logging.conf).
1283
'''
1284
1285
    information_options = '''\
1286
  -C, --show-resource RESNAME     Dump the content of processor resource RESNAME
1287
  -L, --list-resources            List names of processor resources
1288
  -J, --dump-json                 Dump tool description as JSON
1289
  -D, --dump-module-dir           Show the 'module' resource location path for this processor
1290
  -h, --help                      Show this message
1291
  -V, --version                   Show version
1292
'''
1293
1294
    parameter_help = ''
1295
    if 'parameters' not in ocrd_tool or not ocrd_tool['parameters']:
1296
        parameter_help = '  NONE\n'
1297
    else:
1298
        def wrap(s):
1299
            return wrap_text(s, initial_indent=' '*3,
1300
                             subsequent_indent=' '*4,
1301
                             width=72, preserve_paragraphs=True)
1302
        for param_name, param in ocrd_tool['parameters'].items():
1303
            parameter_help += wrap('"%s" [%s%s]' % (
1304
                param_name,
1305
                param['type'],
1306
                ' - REQUIRED' if 'required' in param and param['required'] else
1307
                ' - %s' % json.dumps(param['default']) if 'default' in param else ''))
1308
            parameter_help += '\n ' + wrap(param['description'])
1309
            if 'enum' in param:
1310
                parameter_help += '\n ' + wrap('Possible values: %s' % json.dumps(param['enum']))
1311
            parameter_help += "\n"
1312
1313
    if not subcommand:
1314
        return f'''\
1315
Usage: {ocrd_tool['executable']} [worker|server] [OPTIONS]
1316
1317
  {ocrd_tool['description']}{doc_help}
1318
1319
Subcommands:
1320
{subcommands}
1321
Options for processing:
1322
{processing_options}
1323
Options for information:
1324
{information_options}
1325
Parameters:
1326
{parameter_help}
1327
'''
1328
    elif subcommand == 'worker':
1329
        return f'''\
1330
Usage: {ocrd_tool['executable']} worker [OPTIONS]
1331
1332
  Run {ocrd_tool['executable']} as a processing worker.
1333
1334
  {ocrd_tool['description']}{doc_help}
1335
1336
Options:
1337
{processing_worker_options}
1338
'''
1339
    elif subcommand == 'server':
1340
        return f'''\
1341
Usage: {ocrd_tool['executable']} server [OPTIONS]
1342
1343
  Run {ocrd_tool['executable']} as a processor sever.
1344
1345
  {ocrd_tool['description']}{doc_help}
1346
1347
Options:
1348
{processing_server_options}
1349
'''
1350
    else:
1351
        pass
1352