ocrd.processor.base.Processor.process_page_file()   F
last analyzed

Complexity

Conditions 14

Size

Total Lines 81
Code Lines 61

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 61
dl 0
loc 81
rs 3.6
c 0
b 0
f 0
cc 14
nop 2

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like ocrd.processor.base.Processor.process_page_file() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
Processor base class and helper functions.
3
"""
4
5
__all__ = [
6
    'Processor',
7
    'generate_processor_help',
8
    'run_cli',
9
    'run_processor'
10
]
11
12
from functools import cached_property
13
from os.path import exists, join
14
from shutil import copyfileobj
15
import json
16
import os
17
from os import getcwd
18
from pathlib import Path
19
from typing import Dict, List, Optional, Tuple, Union, get_args
20
import sys
21
import logging
22
import logging.handlers
23
import inspect
24
import tarfile
25
import io
26
from collections import defaultdict
27
from frozendict import frozendict
28
# concurrent.futures is buggy in py38,
29
# this is where the fixes came from:
30
from loky import Future, ProcessPoolExecutor
31
import multiprocessing as mp
32
from multiprocessing.pool import ThreadPool
33
34
from click import wrap_text
35
from deprecated import deprecated
36
from requests import HTTPError
37
38
from ..workspace import Workspace
39
from ..mets_server import ClientSideOcrdMets
40
from ocrd_models.ocrd_file import OcrdFileType
41
from .ocrd_page_result import OcrdPageResult
42
from ocrd_utils import (
43
    VERSION as OCRD_VERSION,
44
    MIMETYPE_PAGE,
45
    config,
46
    getLogger,
47
    list_resource_candidates,
48
    list_all_resources,
49
    get_processor_resource_types,
50
    resource_filename,
51
    parse_json_file_with_comments,
52
    pushd_popd,
53
    make_file_id,
54
    deprecation_warning
55
)
56
from ocrd_validators import ParameterValidator
57
from ocrd_models.ocrd_page import (
58
    PageType,
59
    AlternativeImageType,
60
    MetadataItemType,
61
    LabelType,
62
    LabelsType,
63
    OcrdPage,
64
    to_xml,
65
)
66
from ocrd_modelfactory import page_from_file
67
from ocrd_validators.ocrd_tool_validator import OcrdToolValidator
68
69
# XXX imports must remain for backwards-compatibility
70
from .helpers import run_cli, run_processor  # pylint: disable=unused-import
71
72
73
class ResourceNotFoundError(FileNotFoundError):
74
    """
75
    An exception signifying the requested processor resource
76
    cannot be resolved.
77
    """
78
    def __init__(self, name, executable):
79
        self.name = name
80
        self.executable = executable
81
        self.message = (f"Could not find resource '{name}' for executable '{executable}'. "
82
                        f"Try 'ocrd resmgr download {executable} {name}' to download this resource.")
83
        super().__init__(self.message)
84
85
86
class NonUniqueInputFile(ValueError):
87
    """
88
    An exception signifying the specified fileGrp / pageId / mimetype
89
    selector yields multiple PAGE files, or no PAGE files but multiple images,
90
    or multiple files of that mimetype.
91
    """
92
    def __init__(self, fileGrp, pageId, mimetype):
93
        self.fileGrp = fileGrp
94
        self.pageId = pageId
95
        self.mimetype = mimetype
96
        self.message = (f"Could not determine unique input file for fileGrp {fileGrp} "
97
                        f"and pageId {pageId} under mimetype {mimetype or 'PAGE+image(s)'}")
98
        super().__init__(self.message)
99
100
101
class MissingInputFile(ValueError):
102
    """
103
    An exception signifying the specified fileGrp / pageId / mimetype
104
    selector yields no PAGE files, or no PAGE and no image files,
105
    or no files of that mimetype.
106
    """
107
    def __init__(self, fileGrp, pageId, mimetype):
108
        self.fileGrp = fileGrp
109
        self.pageId = pageId
110
        self.mimetype = mimetype
111
        self.message = (f"Could not find input file for fileGrp {fileGrp} "
112
                        f"and pageId {pageId} under mimetype {mimetype or 'PAGE+image(s)'}")
113
        super().__init__(self.message)
114
115
116
class DummyFuture:
117
    """
118
    Mimics some of `concurrent.futures.Future` but runs immediately.
119
    """
120
    def __init__(self, fn, *args, **kwargs):
121
        self.fn = fn
122
        self.args = args
123
        self.kwargs = kwargs
124
125
    def result(self):
126
        return self.fn(*self.args, **self.kwargs)
127
128
129
class DummyExecutor:
130
    """
131
    Mimics some of `concurrent.futures.ProcessPoolExecutor` but runs
132
    everything immediately in this process.
133
    """
134
    def __init__(self, initializer=None, initargs=(), **kwargs):
135
        initializer(*initargs)
136
137
    def shutdown(self, **kwargs):
138
        # allow gc to catch processor instance (unless cached)
139
        _page_worker_set_ctxt(None, None)
140
141
    def submit(self, fn, *args, **kwargs) -> DummyFuture:
142
        return DummyFuture(fn, *args, **kwargs)
143
144
145
TFuture = Union[DummyFuture, Future]
146
TExecutor = Union[DummyExecutor, ProcessPoolExecutor]
147
148
149
class Processor():
150
    """
151
    A processor is a tool that implements the uniform OCR-D
152
    `command-line interface for run-time data processing <https://ocr-d.de/en/spec/cli>`_.
153
154
    That is, it executes a single workflow step, or a combination of workflow steps,
155
    on the workspace (represented by local METS). It reads input files for all or selected
156
    physical pages of the input fileGrp(s), computes additional annotation, and writes output
157
    files for them into the output fileGrp(s). It may take a number of optional or mandatory
158
    parameters.
159
    """
160
161
    max_instances: int = -1
162
    """
163
    maximum number of cached instances (ignored if negative), to be applied on top of
164
    :py:data:`~ocrd_utils.config.OCRD_MAX_PROCESSOR_CACHE` (i.e. whatever is smaller).
165
166
    (Override this if you know how many instances fit into memory - GPU / CPU RAM - at once.)
167
    """
168
169
    max_workers: int = -1
170
    """
171
    maximum number of processor forks for page-parallel processing (ignored if negative),
172
    to be applied on top of :py:data:`~ocrd_utils.config.OCRD_MAX_PARALLEL_PAGES` (i.e.
173
    whatever is smaller).
174
175
    (Override this if you know how many pages fit into processing units - GPU shaders / CPU cores
176
    - at once, or if your class already creates threads prior to forking, e.g. during ``setup``.)
177
    """
178
179
    max_page_seconds: int = -1
180
    """
181
    maximum number of seconds may be spent processing a single page (ignored if negative),
182
    to be applied on top of :py:data:`~ocrd_utils.config.OCRD_PROCESSING_PAGE_TIMEOUT`
183
    (i.e. whatever is smaller).
184
185
    (Override this if you know how costly this processor may be, irrespective of image size
186
    or complexity of the page.)
187
    """
188
189
    @property
190
    def metadata_filename(self) -> str:
191
        """
192
        Relative location of the ``ocrd-tool.json`` file inside the package.
193
194
        Used by :py:data:`metadata_location`.
195
196
        (Override if ``ocrd-tool.json`` is not in the root of the module,
197
        e.g. ``namespace/ocrd-tool.json`` or ``data/ocrd-tool.json``).
198
        """
199
        return 'ocrd-tool.json'
200
201
    @cached_property
202
    def metadata_location(self) -> Path:
203
        """
204
        Absolute path of the ``ocrd-tool.json`` file as distributed with the package.
205
206
        Used by :py:data:`metadata_rawdict`.
207
208
        (Override if ``ocrd-tool.json`` is not distributed with the Python package.)
209
        """
210
        module = inspect.getmodule(self)
211
        module_tokens = module.__package__.split('.')
212
        # for namespace packages, we cannot just use the first token
213
        for i in range(len(module_tokens)):
214
            prefix = '.'.join(module_tokens[:i + 1])
215
            if sys.modules[prefix].__spec__.has_location:
216
                return resource_filename(prefix, self.metadata_filename)
217
        raise Exception("cannot find top-level module prefix for %s", module.__package__)
218
219
    @cached_property
220
    def metadata_rawdict(self) -> dict:
221
        """
222
        Raw (unvalidated, unexpanded) ``ocrd-tool.json`` dict contents of the package.
223
224
        Used by :py:data:`metadata`.
225
226
        (Override if ``ocrd-tool.json`` is not in a file.)
227
        """
228
        return parse_json_file_with_comments(self.metadata_location)
229
230
    @cached_property
231
    def metadata(self) -> dict:
232
        """
233
        The ``ocrd-tool.json`` dict contents of the package, according to the OCR-D
234
        `spec <https://ocr-d.de/en/spec/ocrd_tool>`_ for processor tools.
235
236
        After deserialisation, it also gets validated against the
237
        `schema <https://ocr-d.de/en/spec/ocrd_tool#definition>`_ with all defaults
238
        expanded.
239
240
        Used by :py:data:`ocrd_tool` and :py:data:`version`.
241
242
        (Override if you want to provide metadata programmatically instead of a
243
        JSON file.)
244
        """
245
        metadata = self.metadata_rawdict
246
        report = OcrdToolValidator.validate(metadata)
247
        if not report.is_valid:
248
            self.logger.error(f"The ocrd-tool.json of this processor is {'problematic' if not report.errors else 'invalid'}:\n"
249
                              f"{report.to_xml()}.\nPlease open an issue at {metadata.get('git_url', 'the website')}.")
250
        return metadata
251
252
    @cached_property
253
    def version(self) -> str:
254
        """
255
        The program version of the package.
256
        Usually the ``version`` part of :py:data:`metadata`.
257
258
        (Override if you do not want to use :py:data:`metadata` lookup
259
        mechanism.)
260
        """
261
        return self.metadata['version']
262
263
    @cached_property
264
    def executable(self) -> str:
265
        """
266
        The executable name of this processor tool. Taken from the runtime
267
        filename.
268
269
        Used by :py:data:`ocrd_tool` for lookup in :py:data:`metadata`.
270
271
        (Override if your entry-point name deviates from the ``executable``
272
        name, or the processor gets instantiated from another runtime.)
273
        """
274
        return os.path.basename(inspect.stack()[-1].filename)
275
276
    @cached_property
277
    def ocrd_tool(self) -> dict:
278
        """
279
        The ``ocrd-tool.json`` dict contents of this processor tool.
280
        Usually the :py:data:`executable` key of the ``tools`` part
281
        of :py:data:`metadata`.
282
283
        (Override if you do not want to use :py:data:`metadata` lookup
284
        mechanism.)
285
        """
286
        return self.metadata['tools'][self.executable]
287
288
    @property
289
    def parameter(self) -> Optional[dict]:
290
        """the runtime parameter dict to be used by this processor"""
291
        if hasattr(self, '_parameter'):
292
            return self._parameter
293
        return None
294
295
    @parameter.setter
296
    def parameter(self, parameter: dict) -> None:
297
        if self.parameter is not None:
298
            self.shutdown()
299
        parameterValidator = ParameterValidator(self.ocrd_tool)
300
        report = parameterValidator.validate(parameter)
301
        if not report.is_valid:
302
            raise ValueError(f'Invalid parameters:\n{report.to_xml()}')
303
        # make parameter dict read-only
304
        self._parameter = frozendict(parameter)
305
        # (re-)run setup to load models etc
306
        self.setup()
307
308
    def __init__(
309
            self,
310
            # FIXME: remove in favor of process_workspace(workspace)
311
            workspace: Optional[Workspace],
312
            ocrd_tool=None,
313
            parameter=None,
314
            input_file_grp=None,
315
            output_file_grp=None,
316
            page_id=None,
317
            version=None
318
    ):
319
        """
320
        Instantiate, but do not setup (neither for processing nor other usage).
321
        If given, do parse and validate :py:data:`.parameter`.
322
323
        Args:
324
             workspace (:py:class:`~ocrd.Workspace`): The workspace to process. \
325
                 If not ``None``, then `chdir` to that directory.
326
                 Deprecated since version 3.0: Should be ``None`` here, but then needs to be set \
327
                 before processing.
328
        Keyword Args:
329
             parameter (string): JSON of the runtime choices for ocrd-tool ``parameters``. \
330
                 Can be ``None`` even for processing, but then needs to be set before running.
331
             input_file_grp (string): comma-separated list of METS ``fileGrp`` used for input. \
332
                 Deprecated since version 3.0: Should be ``None`` here, but then needs to be set \
333
                 before processing.
334
             output_file_grp (string): comma-separated list of METS ``fileGrp`` used for output. \
335
                 Deprecated since version 3.0: Should be ``None`` here, but then needs to be set \
336
                 before processing.
337
             page_id (string): comma-separated list of METS physical ``page`` IDs to process \
338
                 (or empty for all pages). \
339
                 Deprecated since version 3.0: Should be ``None`` here, but then needs to be set \
340
                 before processing.
341
        """
342
        if ocrd_tool is not None:
343
            deprecation_warning("Passing 'ocrd_tool' as keyword argument to Processor is deprecated - "
344
                                "use or override metadata/executable/ocrd-tool properties instead")
345
            self.ocrd_tool = ocrd_tool
346
            self.executable = ocrd_tool['executable']
347
        if version is not None:
348
            deprecation_warning("Passing 'version' as keyword argument to Processor is deprecated - "
349
                                "use or override metadata/version properties instead")
350
            self.version = version
351
        if workspace is not None:
352
            deprecation_warning("Passing a workspace argument other than 'None' to Processor "
353
                                "is deprecated - pass as argument to process_workspace instead")
354
            self.workspace = workspace
355
            self.old_pwd = getcwd()
356
            os.chdir(self.workspace.directory)
357
        if input_file_grp is not None:
358
            deprecation_warning("Passing an input_file_grp kwarg other than 'None' to Processor "
359
                                "is deprecated - pass as argument to process_workspace instead")
360
            self.input_file_grp = input_file_grp
361
        if output_file_grp is not None:
362
            deprecation_warning("Passing an output_file_grp kwarg other than 'None' to Processor "
363
                                "is deprecated - pass as argument to process_workspace instead")
364
            self.output_file_grp = output_file_grp
365
        if page_id is not None:
366
            deprecation_warning("Passing a page_id kwarg other than 'None' to Processor "
367
                                "is deprecated - pass as argument to process_workspace instead")
368
            self.page_id = page_id or None
369
        self.download = config.OCRD_DOWNLOAD_INPUT
370
        #: The logger to be used by processor implementations.
371
        # `ocrd.processor.base` internals should use :py:attr:`self._base_logger`
372
        self.logger = getLogger(f'ocrd.processor.{self.__class__.__name__}')
373
        self._base_logger = getLogger('ocrd.processor.base')
374
        if parameter is not None:
375
            self.parameter = parameter
376
        # workaround for deprecated#72 (@deprecated decorator does not work for subclasses):
377
        setattr(self, 'process', deprecated(
378
            version='3.0', reason='process() should be replaced '
379
            'with process_page_pcgts() or process_page_file() or process_workspace()')(
380
                getattr(self, 'process')))
381
382
    def __del__(self):
383
        self._base_logger.debug("shutting down %s in %s", repr(self), mp.current_process().name)
384
        self.shutdown()
385
386
    def show_help(self, subcommand=None):
387
        """
388
        Print a usage description including the standard CLI and all of this processor's ocrd-tool
389
        parameters and docstrings.
390
        """
391
        print(generate_processor_help(self.ocrd_tool, processor_instance=self, subcommand=subcommand))
392
393
    def show_version(self):
394
        """
395
        Print information on this processor's version and OCR-D version.
396
        """
397
        print("Version %s, ocrd/core %s" % (self.version, OCRD_VERSION))
398
399
    def verify(self):
400
        """
401
        Verify that :py:attr:`input_file_grp` and :py:attr:`output_file_grp` fulfill the processor's requirements.
402
        """
403
        # verify input and output file groups in parameters
404
        assert self.input_file_grp is not None
405
        assert self.output_file_grp is not None
406
        input_file_grps = self.input_file_grp.split(',')
407
        output_file_grps = self.output_file_grp.split(',')
408
409
        def assert_file_grp_cardinality(grps: List[str], spec: Union[int, List[int]], msg):
410
            if isinstance(spec, int):
411
                if spec > 0:
412
                    assert len(grps) == spec, msg % (len(grps), str(spec))
413
            else:
414
                assert isinstance(spec, list)
415
                minimum = spec[0]
416
                maximum = spec[1]
417
                if minimum > 0:
418
                    assert len(grps) >= minimum, msg % (len(grps), str(spec))
419
                if maximum > 0:
420
                    assert len(grps) <= maximum, msg % (len(grps), str(spec))
421
        # FIXME: enforce unconditionally as soon as grace period for deprecation is over
422
        if 'input_file_grp_cardinality' in self.ocrd_tool:
423
            assert_file_grp_cardinality(input_file_grps, self.ocrd_tool['input_file_grp_cardinality'],
424
                                        "Unexpected number of input file groups %d vs %s")
425
        if 'output_file_grp_cardinality' in self.ocrd_tool:
426
            assert_file_grp_cardinality(output_file_grps, self.ocrd_tool['output_file_grp_cardinality'],
427
                                        "Unexpected number of output file groups %d vs %s")
428
        # verify input and output file groups in METS
429
        for input_file_grp in input_file_grps:
430
            assert input_file_grp in self.workspace.mets.file_groups, \
431
                f"input fileGrp {input_file_grp} does not exist in workspace {self.workspace}"
432
        for output_file_grp in output_file_grps:
433
            assert (output_file_grp not in self.workspace.mets.file_groups
434
                    or config.OCRD_EXISTING_OUTPUT in ['OVERWRITE', 'SKIP']
435
                    or not any(self.workspace.mets.find_files(
436
                        pageId=self.page_id, fileGrp=output_file_grp))), \
437
                    f"output fileGrp {output_file_grp} already exists in workspace {self.workspace}"
438
        # keep this for backwards compatibility:
439
        return True
440
441
    def dump_json(self):
442
        """
443
        Print :py:attr:`ocrd_tool` on stdout.
444
        """
445
        print(json.dumps(self.ocrd_tool, indent=True))
446
447
    def dump_module_dir(self):
448
        """
449
        Print :py:attr:`moduledir` on stdout.
450
        """
451
        print(self.moduledir)
452
453
    def list_resources(self):
454
        """
455
        Find all installed resource files in the search paths and print their path names.
456
        """
457
        for res in self.list_all_resources():
458
            print(res)
459
460
    def setup(self) -> None:
461
        """
462
        Prepare the processor for actual data processing,
463
        prior to changing to the workspace directory but
464
        after parsing parameters.
465
466
        (Override this to load models into memory etc.)
467
        """
468
        pass
469
470
    def shutdown(self) -> None:
471
        """
472
        Bring down the processor after data processing,
473
        after to changing back from the workspace directory but
474
        before exiting (or setting up with different parameters).
475
476
        (Override this to unload models from memory etc.)
477
        """
478
        pass
479
480
    @deprecated(version='3.0', reason='process() should be replaced '
481
                'with process_page_pcgts() or process_page_file() or process_workspace()')
482
    def process(self) -> None:
483
        """
484
        Process all files of the :py:data:`workspace`
485
        from the given :py:data:`input_file_grp`
486
        to the given :py:data:`output_file_grp`
487
        for the given :py:data:`page_id` (or all pages)
488
        under the given :py:data:`parameter`.
489
490
        (This contains the main functionality and needs to be
491
        overridden by subclasses.)
492
        """
493
        raise NotImplementedError()
494
495
    def process_workspace(self, workspace: Workspace) -> None:
496
        """
497
        Process all files of the given ``workspace``,
498
        from the given :py:data:`input_file_grp`
499
        to the given :py:data:`output_file_grp`
500
        for the given :py:data:`page_id` (or all pages)
501
        under the given :py:data:`parameter`.
502
503
        Delegates to :py:meth:`.process_workspace_submit_tasks`
504
        and :py:meth:`.process_workspace_handle_tasks`.
505
506
        (This will iterate over pages and files, calling
507
        :py:meth:`.process_page_file` and handling exceptions.
508
        It should be overridden by subclasses to handle cases
509
        like post-processing or computation across pages.)
510
        """
511
        with pushd_popd(workspace.directory):
512
            self.workspace = workspace
513
            self.verify()
514
            try:
515
                # set up multitasking
516
                max_workers = max(0, config.OCRD_MAX_PARALLEL_PAGES)
517
                if self.max_workers > 0 and self.max_workers < config.OCRD_MAX_PARALLEL_PAGES:
518
                    self._base_logger.info("limiting number of workers from %d to %d", max_workers, self.max_workers)
519
                    max_workers = self.max_workers
520
                if max_workers > 1:
521
                    assert isinstance(workspace.mets, ClientSideOcrdMets), \
522
                        "OCRD_MAX_PARALLEL_PAGES>1 requires also using --mets-server-url"
523
                max_seconds = max(0, config.OCRD_PROCESSING_PAGE_TIMEOUT)
524
                if self.max_page_seconds > 0 and self.max_page_seconds < config.OCRD_PROCESSING_PAGE_TIMEOUT:
525
                    self._base_logger.info("limiting page timeout from %d to %d sec", max_seconds, self.max_page_seconds)
526
                    max_seconds = self.max_page_seconds
527
528
                if max_workers > 1:
529
                    executor_cls = ProcessPoolExecutor
530
                    log_queue = mp.get_context('fork').Queue()
531
                else:
532
                    executor_cls = DummyExecutor
533
                    log_queue = None
534
                executor = executor_cls(
535
                    max_workers=max_workers or 1,
536
                    # only forking method avoids pickling
537
                    context=mp.get_context('fork'),
538
                    # share processor instance as global to avoid pickling
539
                    initializer=_page_worker_set_ctxt,
540
                    initargs=(self, log_queue),
541
                )
542
                if max_workers > 1:
543
                    # forward messages from log queue (in subprocesses) to all root handlers
544
                    log_listener = logging.handlers.QueueListener(log_queue, *logging.root.handlers,
545
                                                                  respect_handler_level=True)
546
                    log_listener.start()
547
                tasks = None
548
                try:
549
                    self._base_logger.debug("started executor %s with %d workers", str(executor), max_workers or 1)
550
                    tasks = self.process_workspace_submit_tasks(executor, max_seconds)
551
                    stats = self.process_workspace_handle_tasks(tasks)
552
                finally:
553
                    executor.shutdown(kill_workers=True, wait=False)
554
                    self._base_logger.debug("stopped executor %s after %d tasks", str(executor), len(tasks) if tasks else -1)
555
                    if max_workers > 1:
556
                        # can cause deadlock:
557
                        #log_listener.stop()
558
                        # not much better:
559
                        #log_listener.enqueue_sentinel()
560
                        pass
561
562
            except NotImplementedError:
563
                # fall back to deprecated method
564
                try:
565
                    self.process()
566
                except Exception as err:
567
                    # suppress the NotImplementedError context
568
                    raise err from None
569
570
    def process_workspace_submit_tasks(self, executor: TExecutor, max_seconds: int) -> Dict[
571
            TFuture, Tuple[str, List[Optional[OcrdFileType]]]]:
572
        """
573
        Look up all input files of the given ``workspace``
574
        from the given :py:data:`input_file_grp`
575
        for the given :py:data:`page_id` (or all pages),
576
        and schedules calling :py:meth:`.process_page_file`
577
        on them for each page via `executor` (enforcing
578
        a per-page time limit of `max_seconds`).
579
580
        When running with `OCRD_MAX_PARALLEL_PAGES>1` and
581
        the workspace via METS Server, the executor will fork
582
        this many worker parallel subprocesses each processing
583
        one page at a time. (Interprocess communication is
584
        done via task and result queues.)
585
586
        Otherwise, tasks are run sequentially in the
587
        current process.
588
589
        Delegates to :py:meth:`.zip_input_files` to get
590
        the input files for each page, and then calls
591
        :py:meth:`.process_workspace_submit_page_task`.
592
593
        Returns a dict mapping the per-page tasks
594
        (i.e. futures submitted to the executor)
595
        to their corresponding pageId and input files.
596
        """
597
        tasks = {}
598
        for input_file_tuple in self.zip_input_files(on_error='abort', require_first=False):
599
            task, page_id, input_files = self.process_workspace_submit_page_task(executor, max_seconds, input_file_tuple)
600
            tasks[task] = (page_id, input_files)
601
        self._base_logger.debug("submitted %d processing tasks", len(tasks))
602
        return tasks
603
604
    def process_workspace_submit_page_task(self, executor: TExecutor, max_seconds: int,
605
                                           input_file_tuple: List[Optional[OcrdFileType]]) -> Tuple[
606
                                               TFuture, str, List[Optional[OcrdFileType]]]:
607
        """
608
        Ensure all input files for a single page are
609
        downloaded to the workspace, then schedule
610
        :py:meth:`.process_page_file` to be run on
611
        them via `executor` (enforcing a per-page time
612
        limit of `max_seconds`).
613
614
        Delegates to :py:meth:`.process_page_file`
615
        (wrapped in :py:func:`_page_worker` to share
616
        the processor instance across forked processes).
617
618
        \b
619
        Returns a tuple of:
620
        - the scheduled future object,
621
        - the corresponding pageId,
622
        - the corresponding input files.
623
        """
624
        input_files: List[Optional[OcrdFileType]] = [None] * len(input_file_tuple)
625
        page_id = next(input_file.pageId
626
                       for input_file in input_file_tuple
627
                       if input_file)
628
        self._base_logger.info(f"preparing page {page_id}")
629
        for i, input_file in enumerate(input_file_tuple):
630
            if input_file is None:
631
                # file/page not found in this file grp
632
                continue
633
            input_files[i] = input_file
634
            if not self.download:
635
                continue
636
            try:
637
                input_files[i] = self.workspace.download_file(input_file)
638
            except (ValueError, FileNotFoundError, HTTPError) as e:
639
                self._base_logger.error(repr(e))
640
                self._base_logger.warning(f"failed downloading file {input_file} for page {page_id}")
641
        # process page
642
        #executor.submit(self.process_page_file, *input_files)
643
        return executor.submit(_page_worker, max_seconds, *input_files), page_id, input_files
644
645
    def process_workspace_handle_tasks(self, tasks: Dict[TFuture, Tuple[str, List[Optional[OcrdFileType]]]]) -> Tuple[
646
            int, int, Dict[str, int], int]:
647
        """
648
        Look up scheduled per-page futures one by one,
649
        handle errors (exceptions) and gather results.
650
651
        \b
652
        Enforces policies configured by the following
653
        environment variables:
654
        - `OCRD_EXISTING_OUTPUT` (abort/skip/overwrite)
655
        - `OCRD_MISSING_OUTPUT` (abort/skip/fallback-copy)
656
        - `OCRD_MAX_MISSING_OUTPUTS` (abort after all).
657
658
        \b
659
        Returns a tuple of:
660
        - the number of successfully processed pages
661
        - the number of failed (i.e. skipped or copied) pages
662
        - a dict of the type and corresponding number of exceptions seen
663
        - the number of total requested pages (i.e. success+fail+existing).
664
665
        Delegates to :py:meth:`.process_workspace_handle_page_task`
666
        for each page.
667
        """
668
        # aggregate info for logging:
669
        nr_succeeded = 0
670
        nr_failed = 0
671
        nr_errors = defaultdict(int)  # count causes
672
        if config.OCRD_MISSING_OUTPUT == 'SKIP':
673
            reason = "skipped"
674
        elif config.OCRD_MISSING_OUTPUT == 'COPY':
675
            reason = "fallback-copied"
676
        for task in tasks:
677
            # wait for results, handle errors
678
            page_id, input_files = tasks[task]
679
            result = self.process_workspace_handle_page_task(page_id, input_files, task)
680
            if isinstance(result, Exception):
681
                nr_errors[result.__class__.__name__] += 1
682
                nr_failed += 1
683
                # FIXME: this is just prospective, because len(tasks)==nr_failed+nr_succeeded is not guaranteed
684
                if config.OCRD_MAX_MISSING_OUTPUTS > 0 and nr_failed / len(tasks) > config.OCRD_MAX_MISSING_OUTPUTS:
685
                    # already irredeemably many failures, stop short
686
                    nr_errors = dict(nr_errors)
687
                    raise Exception(f"too many failures with {reason} output ({nr_failed} of {nr_failed+nr_succeeded}, "
688
                                    f"{str(nr_errors)})")
689
            elif result:
690
                nr_succeeded += 1
691
            # else skipped - already exists
692
        nr_errors = dict(nr_errors)
693
        nr_all = nr_succeeded + nr_failed
694
        if nr_failed > 0:
695
            if config.OCRD_MAX_MISSING_OUTPUTS > 0 and nr_failed / nr_all > config.OCRD_MAX_MISSING_OUTPUTS:
696
                raise Exception(f"too many failures with {reason} output ({nr_failed} of {nr_all}, {str(nr_errors)})")
697
            self._base_logger.warning("%s %d of %d pages due to %s", reason, nr_failed, nr_all, str(nr_errors))
698
        self._base_logger.debug("succeeded %d, missed %d of %d pages due to %s",
699
                                nr_succeeded, nr_failed, nr_all, str(nr_errors))
700
        return nr_succeeded, nr_failed, nr_errors, len(tasks)
701
702
    def process_workspace_handle_page_task(self, page_id: str, input_files: List[Optional[OcrdFileType]],
703
                                           task: TFuture) -> Union[bool, Exception]:
704
        """
705
        \b
706
        Await a single page result and handle errors (exceptions),
707
        enforcing policies configured by the following
708
        environment variables:
709
        - `OCRD_EXISTING_OUTPUT` (abort/skip/overwrite)
710
        - `OCRD_MISSING_OUTPUT` (abort/skip/fallback-copy)
711
        - `OCRD_MAX_MISSING_OUTPUTS` (abort after all).
712
713
        \b
714
        Returns
715
        - true in case of success
716
        - false in case the output already exists
717
        - the exception in case of failure
718
        """
719
        # FIXME: differentiate error cases in various ways:
720
        # - ResourceNotFoundError → use ResourceManager to download (once), then retry
721
        # - transient (I/O or OOM) error → maybe sleep, retry
722
        # - persistent (data) error → skip / dummy / raise
723
        try:
724
            self._base_logger.debug("waiting for output of task %s (page %s)", task, page_id)
725
            # timeout kwarg on future is useless: it only raises TimeoutError here,
726
            # but does not stop the running process/thread, and executor itself
727
            # offers nothing to that effect:
728
            # task.result(timeout=max_seconds or None)
729
            # so we instead applied the timeout within the worker function
730
            task.result()
731
            return True
732
        except NotImplementedError:
733
            # exclude NotImplementedError, so we can try process() below
734
            raise
735
        # handle input failures separately
736
        except FileExistsError as err:
737
            if config.OCRD_EXISTING_OUTPUT == 'ABORT':
738
                raise err
739
            if config.OCRD_EXISTING_OUTPUT == 'SKIP':
740
                return False
741
            if config.OCRD_EXISTING_OUTPUT == 'OVERWRITE':
742
                # too late here, must not happen
743
                raise Exception(f"got {err} despite OCRD_EXISTING_OUTPUT==OVERWRITE")
744
        except KeyboardInterrupt:
745
            raise
746
        # broad coverage of output failures (including TimeoutError)
747
        except Exception as err:
748
            # FIXME: add re-usable/actionable logging
749
            if config.OCRD_MISSING_OUTPUT == 'ABORT':
750
                self._base_logger.error(f"Failure on page {page_id}: {str(err) or err.__class__.__name__}")
751
                raise err
752
            self._base_logger.exception(f"Failure on page {page_id}: {str(err) or err.__class__.__name__}")
753
            if config.OCRD_MISSING_OUTPUT == 'SKIP':
754
                pass
755
            elif config.OCRD_MISSING_OUTPUT == 'COPY':
756
                self._copy_page_file(input_files[0])
757
            else:
758
                desc = config.describe('OCRD_MISSING_OUTPUT', wrap_text=False, indent_text=False)
759
                raise ValueError(f"unknown configuration value {config.OCRD_MISSING_OUTPUT} - {desc}")
760
            return err
761
762
    def _copy_page_file(self, input_file: OcrdFileType) -> None:
763
        """
764
        Copy the given ``input_file`` of the :py:data:`workspace`,
765
        representing one physical page (passed as one opened
766
        :py:class:`~ocrd_models.OcrdFile` per input fileGrp)
767
        and add it as if it was a processing result.
768
        """
769
        input_pcgts: OcrdPage
770
        assert isinstance(input_file, get_args(OcrdFileType))
771
        self._base_logger.debug(f"parsing file {input_file.ID} for page {input_file.pageId}")
772
        try:
773
            input_pcgts = page_from_file(input_file)
774
        except ValueError as err:
775
            # not PAGE and not an image to generate PAGE for
776
            self._base_logger.error(f"non-PAGE input for page {input_file.pageId}: {err}")
777
            return
778
        output_file_id = make_file_id(input_file, self.output_file_grp)
779
        input_pcgts.set_pcGtsId(output_file_id)
780
        self.add_metadata(input_pcgts)
781
        self.workspace.add_file(
782
            file_id=output_file_id,
783
            file_grp=self.output_file_grp,
784
            page_id=input_file.pageId,
785
            local_filename=os.path.join(self.output_file_grp, output_file_id + '.xml'),
786
            mimetype=MIMETYPE_PAGE,
787
            content=to_xml(input_pcgts),
788
        )
789
790
    def process_page_file(self, *input_files: Optional[OcrdFileType]) -> None:
791
        """
792
        Process the given ``input_files`` of the :py:data:`workspace`,
793
        representing one physical page (passed as one opened
794
        :py:class:`.OcrdFile` per input fileGrp)
795
        under the given :py:data:`.parameter`, and make sure the
796
        results get added accordingly.
797
798
        (This uses :py:meth:`.process_page_pcgts`, but should be overridden by subclasses
799
        to handle cases like multiple output fileGrps, non-PAGE input etc.)
800
        """
801
        input_pcgts: List[Optional[OcrdPage]] = [None] * len(input_files)
802
        input_pos = next(i for i, input_file in enumerate(input_files) if input_file is not None)
803
        page_id = input_files[input_pos].pageId
804
        self._base_logger.info("processing page %s", page_id)
805
        for i, input_file in enumerate(input_files):
806
            grp = self.input_file_grp.split(',')[i]
807
            if input_file is None:
808
                self._base_logger.debug(f"ignoring missing file for input fileGrp {grp} for page {page_id}")
809
                continue
810
            assert isinstance(input_file, get_args(OcrdFileType))
811
            if not input_file.local_filename:
812
                self._base_logger.error(f'No local file exists for page {page_id} in file group {grp}')
813
                if config.OCRD_MISSING_INPUT == 'ABORT':
814
                    raise MissingInputFile(grp, page_id, input_file.mimetype)
815
                continue
816
            self._base_logger.debug(f"parsing file {input_file.ID} for page {page_id}")
817
            try:
818
                page_ = page_from_file(input_file)
819
                assert isinstance(page_, OcrdPage)
820
                input_pcgts[i] = page_
821
            except ValueError as err:
822
                # not PAGE and not an image to generate PAGE for
823
                self._base_logger.error(f"non-PAGE input for page {page_id}: {err}")
824
        if not any(input_pcgts):
825
            self._base_logger.warning(f'skipping page {page_id}')
826
            return
827
        output_file_id = make_file_id(input_files[input_pos], self.output_file_grp)
828
        if input_files[input_pos].fileGrp == self.output_file_grp:
829
            # input=output fileGrp: re-use ID exactly
830
            output_file_id = input_files[input_pos].ID
831
        output_file = next(self.workspace.mets.find_files(ID=output_file_id), None)
832
        if output_file and config.OCRD_EXISTING_OUTPUT != 'OVERWRITE':
833
            # short-cut avoiding useless computation:
834
            raise FileExistsError(
835
                f"A file with ID=={output_file_id} already exists {output_file} and neither force nor ignore are set"
836
            )
837
        result = self.process_page_pcgts(*input_pcgts, page_id=page_id)
838
        for image_result in result.images:
839
            image_file_id = f'{output_file_id}_{image_result.file_id_suffix}'
840
            image_file_path = join(self.output_file_grp, f'{image_file_id}.png')
841
            if isinstance(image_result.alternative_image, PageType):
842
                # special case: not an alternative image, but replacing the original image
843
                # (this is needed by certain processors when the original's coordinate system
844
                #  cannot or must not be kept)
845
                image_result.alternative_image.set_imageFilename(image_file_path)
846
                image_result.alternative_image.set_imageWidth(image_result.pil.width)
847
                image_result.alternative_image.set_imageHeight(image_result.pil.height)
848
            elif isinstance(image_result.alternative_image, AlternativeImageType):
849
                image_result.alternative_image.set_filename(image_file_path)
850
            elif image_result.alternative_image is None:
851
                pass  # do not reference in PAGE result
852
            else:
853
                raise ValueError(f"process_page_pcgts returned an OcrdPageResultImage of unknown type "
854
                                 f"{type(image_result.alternative_image)}")
855
            self.workspace.save_image_file(
856
                image_result.pil,
857
                image_file_id,
858
                self.output_file_grp,
859
                page_id=page_id,
860
                file_path=image_file_path,
861
            )
862
        result.pcgts.set_pcGtsId(output_file_id)
863
        self.add_metadata(result.pcgts)
864
        self.workspace.add_file(
865
            file_id=output_file_id,
866
            file_grp=self.output_file_grp,
867
            page_id=page_id,
868
            local_filename=os.path.join(self.output_file_grp, output_file_id + '.xml'),
869
            mimetype=MIMETYPE_PAGE,
870
            content=to_xml(result.pcgts),
871
        )
872
873
    def process_page_pcgts(self, *input_pcgts: Optional[OcrdPage], page_id: Optional[str] = None) -> OcrdPageResult:
874
        """
875
        Process the given ``input_pcgts`` of the :py:data:`.workspace`,
876
        representing one physical page (passed as one parsed
877
        :py:class:`.OcrdPage` per input fileGrp)
878
        under the given :py:data:`.parameter`, and return the
879
        resulting :py:class:`.OcrdPageResult`.
880
881
        Optionally, add to the ``images`` attribute of the resulting
882
        :py:class:`.OcrdPageResult` instances of :py:class:`.OcrdPageResultImage`,
883
        which have required fields for ``pil`` (:py:class:`PIL.Image` image data),
884
        ``file_id_suffix`` (used for generating IDs of the saved image) and
885
        ``alternative_image`` (reference of the :py:class:`ocrd_models.ocrd_page.AlternativeImageType`
886
        for setting the filename of the saved image).
887
888
        (This contains the main functionality and must be overridden by subclasses,
889
        unless it does not get called by some overriden :py:meth:`.process_page_file`.)
890
        """
891
        raise NotImplementedError()
892
893
    def add_metadata(self, pcgts: OcrdPage) -> None:
894
        """
895
        Add PAGE-XML :py:class:`~ocrd_models.ocrd_page.MetadataItemType` ``MetadataItem`` describing
896
        the processing step and runtime parameters to :py:class:`.OcrdPage` ``pcgts``.
897
        """
898
        metadata_obj = pcgts.get_Metadata()
899
        assert metadata_obj is not None
900
        metadata_item = MetadataItemType(
901
            type_="processingStep",
902
            name=self.ocrd_tool['steps'][0],
903
            value=self.ocrd_tool['executable'],
904
            Labels=[LabelsType(
905
                externalModel="ocrd-tool",
906
                externalId="parameters",
907
                Label=[LabelType(type_=name,
908
                                 value=self.parameter[name])
909
                       for name in self.parameter.keys()]),
910
                    LabelsType(
911
                        externalModel="ocrd-tool",
912
                        externalId="version",
913
                        Label=[LabelType(type_=self.ocrd_tool['executable'],
914
                                         value=self.version),
915
                               LabelType(type_='ocrd/core',
916
                                         value=OCRD_VERSION)])
917
            ])
918
        metadata_obj.add_MetadataItem(metadata_item)
919
920
    def resolve_resource(self, val):
921
        """
922
        Resolve a resource name to an absolute file path with the algorithm in
923
        `spec <https://ocr-d.de/en/spec/ocrd_tool#file-parameters>`_
924
925
        Args:
926
            val (string): resource value to resolve
927
        """
928
        executable = self.ocrd_tool['executable']
929
        if exists(val):
930
            self._base_logger.debug("Resolved to absolute path %s" % val)
931
            return val
932
        # FIXME: remove once workspace arg / old_pwd is gone:
933
        if hasattr(self, 'old_pwd'):
934
            cwd = self.old_pwd
935
        else:
936
            cwd = getcwd()
937
        ret = list(filter(exists, list_resource_candidates(executable, val,
938
                                                           cwd=cwd, moduled=self.moduledir)))
939
        if ret:
940
            self._base_logger.debug("Resolved %s to absolute path %s" % (val, ret[0]))
941
            return ret[0]
942
        raise ResourceNotFoundError(val, executable)
943
944
    def show_resource(self, val):
945
        """
946
        Resolve a resource name to a file path with the algorithm in
947
        `spec <https://ocr-d.de/en/spec/ocrd_tool#file-parameters>`_,
948
        then print its contents to stdout.
949
950
        Args:
951
            val (string): resource value to show
952
        """
953
        res_fname = self.resolve_resource(val)
954
        fpath = Path(res_fname)
955
        if fpath.is_dir():
956
            with pushd_popd(fpath):
957
                fileobj = io.BytesIO()
958
                with tarfile.open(fileobj=fileobj, mode='w:gz') as tarball:
959
                    tarball.add('.')
960
                fileobj.seek(0)
961
                copyfileobj(fileobj, sys.stdout.buffer)
962
        else:
963
            sys.stdout.buffer.write(fpath.read_bytes())
964
965
    def list_all_resources(self):
966
        """
967
        List all resources found in the filesystem and matching content-type by filename suffix
968
        """
969
        for res in list_all_resources(self.executable, ocrd_tool=self.ocrd_tool, moduled=self.moduledir):
970
            res = Path(res)
971
            yield res.name
972
973
    @property
974
    def module(self):
975
        """
976
        The top-level module this processor belongs to.
977
        """
978
        # find shortest prefix path that is not just a namespace package
979
        fqname = ''
980
        for name in self.__module__.split('.'):
981
            if fqname:
982
                fqname += '.'
983
            fqname += name
984
            if getattr(sys.modules[fqname], '__file__', None):
985
                return fqname
986
        # fall-back
987
        return self.__module__
988
989
    @property
990
    def moduledir(self):
991
        """
992
        The filesystem path of the module directory.
993
        """
994
        return resource_filename(self.module, '.')
995
996
    @property
997
    def input_files(self):
998
        """
999
        List the input files (for single-valued :py:attr:`input_file_grp`).
1000
1001
        For each physical page:
1002
1003
        - If there is a single PAGE-XML for the page, take it (and forget about all
1004
          other files for that page)
1005
        - Else if there is a single image file, take it (and forget about all other
1006
          files for that page)
1007
        - Otherwise raise an error (complaining that only PAGE-XML warrants
1008
          having multiple images for a single page)
1009
1010
        See `algorithm <https://github.com/cisocrgroup/ocrd_cis/pull/57#issuecomment-656336593>`_
1011
1012
        Returns:
1013
            A list of :py:class:`ocrd_models.ocrd_file.OcrdFile` objects.
1014
        """
1015
        if not self.input_file_grp:
1016
            raise ValueError("Processor is missing input fileGrp")
1017
        ret = self.zip_input_files(mimetype=None, on_error='abort')
1018
        if not ret:
1019
            return []
1020
        assert len(ret[0]) == 1, 'Use zip_input_files() instead of input_files when processing multiple input fileGrps'
1021
        return [tuples[0] for tuples in ret]
1022
1023
    def zip_input_files(self, require_first=True, mimetype=None, on_error='skip'):
1024
        """
1025
        List tuples of input files (for multi-valued :py:attr:`input_file_grp`).
1026
1027
        Processors that expect/need multiple input file groups,
1028
        cannot use :py:data:`input_files`. They must align (zip) input files
1029
        across pages. This includes the case where not all pages
1030
        are equally present in all file groups. It also requires
1031
        making a consistent selection if there are multiple files
1032
        per page.
1033
1034
        Following the OCR-D functional model, this function tries to
1035
        find a single PAGE file per page, or fall back to a single
1036
        image file per page. In either case, multiple matches per page
1037
        are an error (see error handling below).
1038
        This default behaviour can be changed by using a fixed MIME
1039
        type filter via :py:attr:`mimetype`. But still, multiple matching
1040
        files per page are an error.
1041
1042
        Single-page multiple-file errors are handled according to
1043
        :py:attr:`on_error`:
1044
1045
        - if ``skip``, then the page for the respective fileGrp will be
1046
          silently skipped (as if there was no match at all)
1047
        - if ``first``, then the first matching file for the page will be
1048
          silently selected (as if the first was the only match)
1049
        - if ``last``, then the last matching file for the page will be
1050
          silently selected (as if the last was the only match)
1051
        - if ``abort``, then an exception will be raised.
1052
1053
        Multiple matches for PAGE-XML will always raise an exception.
1054
1055
        Keyword Args:
1056
             require_first (boolean): If true, then skip a page entirely
1057
                 whenever it is not available in the first input `fileGrp`.
1058
             on_error (string): How to handle multiple file matches per page.
1059
             mimetype (string): If not `None`, filter by the specified MIME
1060
                 type (literal or regex prefixed by `//`). Otherwise prefer
1061
                 PAGE or image.
1062
        Returns:
1063
            A list of :py:class:`ocrd_models.ocrd_file.OcrdFile` tuples.
1064
        """
1065
        if not self.input_file_grp:
1066
            raise ValueError("Processor is missing input fileGrp")
1067
1068
        ifgs = self.input_file_grp.split(",")
1069
        # Iterating over all files repeatedly may seem inefficient at first sight,
1070
        # but the unnecessary OcrdFile instantiations for posterior fileGrp filtering
1071
        # can actually be much more costly than traversing the ltree.
1072
        # This might depend on the number of pages vs number of fileGrps.
1073
1074
        pages = {}
1075
        for i, ifg in enumerate(ifgs):
1076
            files_ = sorted(self.workspace.mets.find_all_files(
1077
                    pageId=self.page_id, fileGrp=ifg, mimetype=mimetype),
1078
                                # sort by MIME type so PAGE comes before images
1079
                                key=lambda file_: file_.mimetype)
1080
            for file_ in files_:
1081
                if not file_.pageId:
1082
                    # ignore document-global files
1083
                    continue
1084
                ift = pages.setdefault(file_.pageId, [None]*len(ifgs))
1085
                if ift[i]:
1086
                    self._base_logger.debug(
1087
                        f"another file {file_.ID} for page {file_.pageId} in input file group {ifg}")
1088
                    # fileGrp has multiple files for this page ID
1089
                    if mimetype:
1090
                        # filter was active, this must not happen
1091
                        self._base_logger.warning(
1092
                            f"added file {file_.ID} for page {file_.pageId} in input file group {ifg} "
1093
                            f"conflicts with file {ift[i].ID} of same MIME type {mimetype} - on_error={on_error}")
1094 View Code Duplication
                        if on_error == 'skip':
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1095
                            ift[i] = None
1096
                        elif on_error == 'first':
1097
                            pass  # keep first match
1098
                        elif on_error == 'last':
1099
                            ift[i] = file_
1100
                        elif on_error == 'abort':
1101
                            raise NonUniqueInputFile(ifg, file_.pageId, mimetype)
1102
                        else:
1103
                            raise Exception("Unknown 'on_error' strategy '%s'" % on_error)
1104
                    elif (ift[i].mimetype == MIMETYPE_PAGE and
1105
                          file_.mimetype != MIMETYPE_PAGE):
1106
                        pass  # keep PAGE match
1107
                    elif (ift[i].mimetype == MIMETYPE_PAGE and
1108
                          file_.mimetype == MIMETYPE_PAGE):
1109
                        raise NonUniqueInputFile(ifg, file_.pageId, None)
1110
                    else:
1111
                        # filter was inactive but no PAGE is in control, this must not happen
1112
                        self._base_logger.warning(
1113
                            f"added file {file_.ID} for page {file_.pageId} in input file group {ifg} "
1114
                            f"conflicts with file {ift[i].ID} but no PAGE available - on_error={on_error}")
1115 View Code Duplication
                        if on_error == 'skip':
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1116
                            ift[i] = None
1117
                        elif on_error == 'first':
1118
                            pass  # keep first match
1119
                        elif on_error == 'last':
1120
                            ift[i] = file_
1121
                        elif on_error == 'abort':
1122
                            raise NonUniqueInputFile(ifg, file_.pageId, None)
1123
                        else:
1124
                            raise Exception("Unknown 'on_error' strategy '%s'" % on_error)
1125
                else:
1126
                    self._base_logger.debug(f"adding file {file_.ID} for page {file_.pageId} to input file group {ifg}")
1127
                    ift[i] = file_
1128
        # Warn if no files found but pageId was specified, because that might be due to invalid page_id (range)
1129
        if self.page_id and not any(pages):
1130
            self._base_logger.critical(f"Could not find any files for selected pageId {self.page_id}.\n"
1131
                                       f"compare '{self.page_id}' with the output of 'orcd workspace list-page'.")
1132
        ifts = []
1133
        # use physical page order
1134
        for page in self.workspace.mets.physical_pages:
1135
            if page not in pages:
1136
                continue
1137
            ifiles = pages[page]
1138
            for i, ifg in enumerate(ifgs):
1139
                if not ifiles[i]:
1140
                    # could be from non-unique with on_error=skip or from true gap
1141
                    self._base_logger.error(f'Found no file for page {page} in file group {ifg}')
1142
                    if config.OCRD_MISSING_INPUT == 'ABORT':
1143
                        raise MissingInputFile(ifg, page, mimetype)
1144
            if not any(ifiles):
1145
                # must be from non-unique with on_error=skip
1146
                self._base_logger.warning(f'Found no files for {page} - skipping')
1147
                continue
1148
            if ifiles[0] or not require_first:
1149
                ifts.append(tuple(ifiles))
1150
        return ifts
1151
1152
1153
_page_worker_processor = None
1154
"""
1155
This global binding for the processor is required to avoid
1156
squeezing the processor through a mp.Queue (which is impossible
1157
due to unpicklable attributes like .workspace.mets._tree anyway)
1158
when calling Processor.process_page_file as page worker processes
1159
in Processor.process_workspace. Forking allows inheriting global
1160
objects, and with the METS Server we do not mutate the local
1161
processor instance anyway.
1162
"""
1163
1164
1165
def _page_worker_set_ctxt(processor, log_queue):
1166
    """
1167
    Overwrites `ocrd.processor.base._page_worker_processor` instance
1168
    for sharing with subprocesses in ProcessPoolExecutor initializer.
1169
    """
1170
    global _page_worker_processor
1171
    _page_worker_processor = processor
1172
    if log_queue:
1173
        # replace all log handlers with just one queue handler
1174
        logging.root.handlers = [logging.handlers.QueueHandler(log_queue)]
1175
1176
1177
def _page_worker(timeout, *input_files):
1178
    """
1179
    Wraps a `Processor.process_page_file` call as payload (call target)
1180
    of the ProcessPoolExecutor workers, but also enforces the given timeout.
1181
    """
1182
    page_id = next((file.pageId for file in input_files
1183
                    if hasattr(file, 'pageId')), "")
1184
    pool = ThreadPool(processes=1)
1185
    try:
1186
        #_page_worker_processor.process_page_file(*input_files)
1187
        async_result = pool.apply_async(_page_worker_processor.process_page_file, input_files)
1188
        async_result.get(timeout or None)
1189
        _page_worker_processor.logger.debug("page worker completed for page %s", page_id)
1190
    except mp.TimeoutError:
1191
        _page_worker_processor.logger.debug("page worker timed out for page %s", page_id)
1192
        raise
1193
1194
1195
def generate_processor_help(ocrd_tool, processor_instance=None, subcommand=None):
1196
    """Generate a string describing the full CLI of this processor including params.
1197
1198
    Args:
1199
         ocrd_tool (dict): this processor's ``tools`` section of the module's ``ocrd-tool.json``
1200
         processor_instance (object, optional): the processor implementation
1201
             (for adding any module/class/function docstrings)
1202
        subcommand (string, optional): 'worker'
1203
    """
1204
    doc_help = ''
1205
    if processor_instance:
1206
        module = inspect.getmodule(processor_instance)
1207
        if module and module.__doc__:
1208
            doc_help += '\n' + inspect.cleandoc(module.__doc__) + '\n'
1209
        if processor_instance.__doc__:
1210
            doc_help += '\n' + inspect.cleandoc(processor_instance.__doc__) + '\n'
1211
        # Try to find the most concrete docstring among the various methods that an implementation
1212
        # could overload, first serving.
1213
        # In doing so, compare with Processor to avoid a glitch in the way py>=3.5 inherits docstrings.
1214
        # (They are supposed to only repeat information inspect.getdoc, rather than inherit __doc__ itself.)
1215
        for method in ['process_page_pcgts', 'process_page_file', 'process_workspace', 'process']:
1216
            instance_method = getattr(processor_instance, method)
1217
            superclass_method = getattr(Processor, method)
1218
            if instance_method.__doc__ and instance_method.__doc__ != superclass_method.__doc__:
1219
                doc_help += '\n' + inspect.cleandoc(instance_method.__doc__) + '\n'
1220
                break
1221
        if doc_help:
1222
            doc_help = '\n\n' + wrap_text(doc_help, width=72,
1223
                                          initial_indent='  > ',
1224
                                          subsequent_indent='  > ',
1225
                                          preserve_paragraphs=True)
1226
    subcommands = '''\
1227
    worker      Start a processing worker rather than do local processing
1228
'''
1229
1230
    processing_worker_options = '''\
1231
  --queue                         The RabbitMQ server address in format
1232
                                  "amqp://{user}:{pass}@{host}:{port}/{vhost}"
1233
                                  [amqp://admin:admin@localhost:5672]
1234
  --database                      The MongoDB server address in format
1235
                                  "mongodb://{host}:{port}"
1236
                                  [mongodb://localhost:27018]
1237
  --log-filename                  Filename to redirect STDOUT/STDERR to,
1238
                                  if specified.
1239
'''
1240
1241
    processing_server_options = '''\
1242
  --database                      The MongoDB server address in format
1243
                                  "mongodb://{host}:{port}"
1244
                                  [mongodb://localhost:27018]
1245
'''
1246
1247
    processing_options = '''\
1248
  -m, --mets URL-PATH             URL or file path of METS to process [./mets.xml]
1249
  -w, --working-dir PATH          Working directory of local workspace [dirname(URL-PATH)]
1250
  -I, --input-file-grp USE        File group(s) used as input
1251
  -O, --output-file-grp USE       File group(s) used as output
1252
  -g, --page-id ID                Physical page ID(s) to process instead of full document []
1253
  --overwrite                     Remove existing output pages/images
1254
                                  (with "--page-id", remove only those).
1255
                                  Short-hand for OCRD_EXISTING_OUTPUT=OVERWRITE
1256
  --debug                         Abort on any errors with full stack trace.
1257
                                  Short-hand for OCRD_MISSING_OUTPUT=ABORT
1258
  --profile                       Enable profiling
1259
  --profile-file PROF-PATH        Write cProfile stats to PROF-PATH. Implies "--profile"
1260
  -p, --parameter JSON-PATH       Parameters, either verbatim JSON string
1261
                                  or JSON file path
1262
  -P, --param-override KEY VAL    Override a single JSON object key-value pair,
1263
                                  taking precedence over --parameter
1264
  -U, --mets-server-url URL       URL of a METS Server for parallel incremental access to METS
1265
                                  If URL starts with http:// start an HTTP server there,
1266
                                  otherwise URL is a path to an on-demand-created unix socket
1267
  -l, --log-level [OFF|ERROR|WARN|INFO|DEBUG|TRACE]
1268
                                  Override log level globally [INFO]
1269
  --log-filename LOG-PATH         File to redirect stderr logging to (overriding ocrd_logging.conf).
1270
'''
1271
1272
    information_options = '''\
1273
  -C, --show-resource RESNAME     Dump the content of processor resource RESNAME
1274
  -L, --list-resources            List names of processor resources
1275
  -J, --dump-json                 Dump tool description as JSON
1276
  -D, --dump-module-dir           Show the 'module' resource location path for this processor
1277
  -h, --help                      Show this message
1278
  -V, --version                   Show version
1279
'''
1280
1281
    parameter_help = ''
1282
    if 'parameters' not in ocrd_tool or not ocrd_tool['parameters']:
1283
        parameter_help = '  NONE\n'
1284
    else:
1285
        def wrap(s):
1286
            return wrap_text(s, initial_indent=' ' * 3,
1287
                             subsequent_indent=' ' * 4,
1288
                             width=72, preserve_paragraphs=True)
1289
        for param_name, param in ocrd_tool['parameters'].items():
1290
            parameter_help += wrap('"%s" [%s%s]' % (
1291
                param_name,
1292
                param['type'],
1293
                ' - REQUIRED' if 'required' in param and param['required'] else
1294
                ' - %s' % json.dumps(param['default']) if 'default' in param else ''))
1295
            parameter_help += '\n ' + wrap(param['description'])
1296
            if 'enum' in param:
1297
                parameter_help += '\n ' + wrap('Possible values: %s' % json.dumps(param['enum']))
1298
            parameter_help += "\n"
1299
1300
    if not subcommand:
1301
        return f'''\
1302
Usage: {ocrd_tool['executable']} [worker|server] [OPTIONS]
1303
1304
  {ocrd_tool['description']}{doc_help}
1305
1306
Subcommands:
1307
{subcommands}
1308
Options for processing:
1309
{processing_options}
1310
Options for information:
1311
{information_options}
1312
Parameters:
1313
{parameter_help}
1314
'''
1315
    elif subcommand == 'worker':
1316
        return f'''\
1317
Usage: {ocrd_tool['executable']} worker [OPTIONS]
1318
1319
  Run {ocrd_tool['executable']} as a processing worker.
1320
1321
  {ocrd_tool['description']}{doc_help}
1322
1323
Options:
1324
{processing_worker_options}
1325
'''
1326
    else:
1327
        pass
1328