Passed
Push — master ( fd842e...236931 )
by Konstantin
02:19
created

ocrd.processor.base   F

Complexity

Total Complexity 184

Size/Duplication

Total Lines 1338
Duplicated Lines 1.49 %

Importance

Changes 0
Metric Value
wmc 184
eloc 653
dl 20
loc 1338
rs 1.947
c 0
b 0
f 0

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like ocrd.processor.base often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
Processor base class and helper functions.
3
"""
4
5
__all__ = [
6
    'Processor',
7
    'generate_processor_help',
8
    'run_cli',
9
    'run_processor'
10
]
11
12
from functools import cached_property
13
from os.path import exists, join
14
from shutil import copyfileobj
15
import json
16
import os
17
from os import getcwd
18
from pathlib import Path
19
from typing import Dict, List, Optional, Tuple, Union, get_args
20
import sys
21
import logging
22
import logging.handlers
23
import inspect
24
import tarfile
25
import io
26
from collections import defaultdict
27
from frozendict import frozendict
28
# concurrent.futures cannot timeout-kill workers
29
from pebble import ProcessFuture as Future, ProcessPool as ProcessPoolExecutor
30
from concurrent.futures import TimeoutError
31
import multiprocessing as mp
32
import threading
33
34
from cysignals import alarm
35
from click import wrap_text
36
from deprecated import deprecated
37
from requests import HTTPError
38
39
from ..workspace import Workspace
40
from ..mets_server import ClientSideOcrdMets
41
from ocrd_models.ocrd_file import OcrdFileType
42
from .ocrd_page_result import OcrdPageResult
43
from ocrd_utils import (
44
    VERSION as OCRD_VERSION,
45
    MIMETYPE_PAGE,
46
    config,
47
    getLogger,
48
    list_resource_candidates,
49
    list_all_resources,
50
    get_processor_resource_types,
51
    resource_filename,
52
    parse_json_file_with_comments,
53
    pushd_popd,
54
    make_file_id,
55
    deprecation_warning
56
)
57
from ocrd_validators import ParameterValidator
58
from ocrd_models.ocrd_page import (
59
    PageType,
60
    AlternativeImageType,
61
    MetadataItemType,
62
    LabelType,
63
    LabelsType,
64
    OcrdPage,
65
    to_xml,
66
)
67
from ocrd_modelfactory import page_from_file
68
from ocrd_validators.ocrd_tool_validator import OcrdToolValidator
69
70
# XXX imports must remain for backwards-compatibility
71
from .helpers import run_cli, run_processor  # pylint: disable=unused-import
72
73
74
class ResourceNotFoundError(FileNotFoundError):
75
    """
76
    An exception signifying the requested processor resource
77
    cannot be resolved.
78
    """
79
    def __init__(self, name, executable):
80
        self.name = name
81
        self.executable = executable
82
        self.message = (f"Could not find resource '{name}' for executable '{executable}'. "
83
                        f"Try 'ocrd resmgr download {executable} {name}' to download this resource.")
84
        super().__init__(self.message)
85
86
87
class NonUniqueInputFile(ValueError):
88
    """
89
    An exception signifying the specified fileGrp / pageId / mimetype
90
    selector yields multiple PAGE files, or no PAGE files but multiple images,
91
    or multiple files of that mimetype.
92
    """
93
    def __init__(self, fileGrp, pageId, mimetype):
94
        self.fileGrp = fileGrp
95
        self.pageId = pageId
96
        self.mimetype = mimetype
97
        self.message = (f"Could not determine unique input file for fileGrp {fileGrp} "
98
                        f"and pageId {pageId} under mimetype {mimetype or 'PAGE+image(s)'}")
99
        super().__init__(self.message)
100
101
102
class MissingInputFile(ValueError):
103
    """
104
    An exception signifying the specified fileGrp / pageId / mimetype
105
    selector yields no PAGE files, or no PAGE and no image files,
106
    or no files of that mimetype.
107
    """
108
    def __init__(self, fileGrp, pageId, mimetype):
109
        self.fileGrp = fileGrp
110
        self.pageId = pageId
111
        self.mimetype = mimetype
112
        self.message = (f"Could not find input file for fileGrp {fileGrp} "
113
                        f"and pageId {pageId} under mimetype {mimetype or 'PAGE+image(s)'}")
114
        super().__init__(self.message)
115
116
117
class IncompleteProcessorImplementation(NotImplementedError):
118
    """
119
    An exception signifying the Processor subclass is incomplete,
120
    because either :py:meth:`Processor.process_page_pcgts()` or
121
    :py:meth:`Processor.process()` was not overridden.
122
    """
123
124
class DummyFuture:
125
    """
126
    Mimics some of `concurrent.futures.Future` but runs immediately.
127
    """
128
    def __init__(self, fn, *args, **kwargs):
129
        self.fn = fn
130
        self.args = args
131
        self.kwargs = kwargs
132
133
    def result(self):
134
        return self.fn(*self.args, **self.kwargs)
135
136
137
class DummyExecutor:
138
    """
139
    Mimics some of `concurrent.futures.ProcessPoolExecutor` but runs
140
    everything immediately in this process.
141
    """
142
    def __init__(self, initializer=None, initargs=(), **kwargs):
143
        initializer(*initargs)
144
145
    def stop(self):
146
        # allow gc to catch processor instance (unless cached)
147
        _page_worker_set_ctxt(None, None)
148
149
    def join(self, **kwargs):
150
        pass
151
152
    def schedule(self, fn, args=None, kwargs=None, timeout=None) -> DummyFuture:
153
        args = args or []
154
        kwargs = kwargs or {}
155
        timeout = timeout or 0
156
        return DummyFuture(fn, *args, **kwargs, timeout=timeout)
157
158
159
TFuture = Union[DummyFuture, Future]
160
TExecutor = Union[DummyExecutor, ProcessPoolExecutor]
161
162
163
class Processor():
164
    """
165
    A processor is a tool that implements the uniform OCR-D
166
    `command-line interface for run-time data processing <https://ocr-d.de/en/spec/cli>`_.
167
168
    That is, it executes a single workflow step, or a combination of workflow steps,
169
    on the workspace (represented by local METS). It reads input files for all or selected
170
    physical pages of the input fileGrp(s), computes additional annotation, and writes output
171
    files for them into the output fileGrp(s). It may take a number of optional or mandatory
172
    parameters.
173
    """
174
175
    max_instances: int = -1
176
    """
177
    maximum number of cached instances (ignored if negative), to be applied on top of
178
    :py:data:`~ocrd_utils.config.OCRD_MAX_PROCESSOR_CACHE` (i.e. whatever is smaller).
179
180
    (Override this if you know how many instances fit into memory - GPU / CPU RAM - at once.)
181
    """
182
183
    max_workers: int = -1
184
    """
185
    maximum number of processor forks for page-parallel processing (ignored if negative),
186
    to be applied on top of :py:data:`~ocrd_utils.config.OCRD_MAX_PARALLEL_PAGES` (i.e.
187
    whatever is smaller).
188
189
    (Override this if you know how many pages fit into processing units - GPU shaders / CPU cores
190
    - at once, or if your class already creates threads prior to forking, e.g. during ``setup``.)
191
    """
192
193
    max_page_seconds: int = -1
194
    """
195
    maximum number of seconds may be spent processing a single page (ignored if negative),
196
    to be applied on top of :py:data:`~ocrd_utils.config.OCRD_PROCESSING_PAGE_TIMEOUT`
197
    (i.e. whatever is smaller).
198
199
    (Override this if you know how costly this processor may be, irrespective of image size
200
    or complexity of the page.)
201
    """
202
203
    @property
204
    def metadata_filename(self) -> str:
205
        """
206
        Relative location of the ``ocrd-tool.json`` file inside the package.
207
208
        Used by :py:data:`metadata_location`.
209
210
        (Override if ``ocrd-tool.json`` is not in the root of the module,
211
        e.g. ``namespace/ocrd-tool.json`` or ``data/ocrd-tool.json``).
212
        """
213
        return 'ocrd-tool.json'
214
215
    @cached_property
216
    def metadata_location(self) -> Path:
217
        """
218
        Absolute path of the ``ocrd-tool.json`` file as distributed with the package.
219
220
        Used by :py:data:`metadata_rawdict`.
221
222
        (Override if ``ocrd-tool.json`` is not distributed with the Python package.)
223
        """
224
        module = inspect.getmodule(self)
225
        module_tokens = module.__package__.split('.')
226
        # for namespace packages, we cannot just use the first token
227
        for i in range(len(module_tokens)):
228
            prefix = '.'.join(module_tokens[:i + 1])
229
            if sys.modules[prefix].__spec__.has_location:
230
                return resource_filename(prefix, self.metadata_filename)
231
        raise Exception("cannot find top-level module prefix for %s", module.__package__)
232
233
    @cached_property
234
    def metadata_rawdict(self) -> dict:
235
        """
236
        Raw (unvalidated, unexpanded) ``ocrd-tool.json`` dict contents of the package.
237
238
        Used by :py:data:`metadata`.
239
240
        (Override if ``ocrd-tool.json`` is not in a file.)
241
        """
242
        return parse_json_file_with_comments(self.metadata_location)
243
244
    @cached_property
245
    def metadata(self) -> dict:
246
        """
247
        The ``ocrd-tool.json`` dict contents of the package, according to the OCR-D
248
        `spec <https://ocr-d.de/en/spec/ocrd_tool>`_ for processor tools.
249
250
        After deserialisation, it also gets validated against the
251
        `schema <https://ocr-d.de/en/spec/ocrd_tool#definition>`_ with all defaults
252
        expanded.
253
254
        Used by :py:data:`ocrd_tool` and :py:data:`version`.
255
256
        (Override if you want to provide metadata programmatically instead of a
257
        JSON file.)
258
        """
259
        metadata = self.metadata_rawdict
260
        report = OcrdToolValidator.validate(metadata)
261
        if not report.is_valid:
262
            self.logger.error(f"The ocrd-tool.json of this processor is {'problematic' if not report.errors else 'invalid'}:\n"
263
                              f"{report.to_xml()}.\nPlease open an issue at {metadata.get('git_url', 'the website')}.")
264
        return metadata
265
266
    @cached_property
267
    def version(self) -> str:
268
        """
269
        The program version of the package.
270
        Usually the ``version`` part of :py:data:`metadata`.
271
272
        (Override if you do not want to use :py:data:`metadata` lookup
273
        mechanism.)
274
        """
275
        return self.metadata['version']
276
277
    @cached_property
278
    def executable(self) -> str:
279
        """
280
        The executable name of this processor tool. Taken from the runtime
281
        filename.
282
283
        Used by :py:data:`ocrd_tool` for lookup in :py:data:`metadata`.
284
285
        (Override if your entry-point name deviates from the ``executable``
286
        name, or the processor gets instantiated from another runtime.)
287
        """
288
        return os.path.basename(inspect.stack()[-1].filename)
289
290
    @cached_property
291
    def ocrd_tool(self) -> dict:
292
        """
293
        The ``ocrd-tool.json`` dict contents of this processor tool.
294
        Usually the :py:data:`executable` key of the ``tools`` part
295
        of :py:data:`metadata`.
296
297
        (Override if you do not want to use :py:data:`metadata` lookup
298
        mechanism.)
299
        """
300
        return self.metadata['tools'][self.executable]
301
302
    @property
303
    def parameter(self) -> Optional[dict]:
304
        """the runtime parameter dict to be used by this processor"""
305
        if hasattr(self, '_parameter'):
306
            return self._parameter
307
        return None
308
309
    @parameter.setter
310
    def parameter(self, parameter: dict) -> None:
311
        if self.parameter is not None:
312
            self.shutdown()
313
        parameterValidator = ParameterValidator(self.ocrd_tool)
314
        report = parameterValidator.validate(parameter)
315
        if not report.is_valid:
316
            raise ValueError(f'Invalid parameters:\n{report.to_xml()}')
317
        # make parameter dict read-only
318
        self._parameter = frozendict(parameter)
319
        # (re-)run setup to load models etc
320
        self.setup()
321
322
    def __init__(
323
            self,
324
            # FIXME: remove in favor of process_workspace(workspace)
325
            workspace: Optional[Workspace],
326
            ocrd_tool=None,
327
            parameter=None,
328
            input_file_grp=None,
329
            output_file_grp=None,
330
            page_id=None,
331
            version=None
332
    ):
333
        """
334
        Instantiate, but do not setup (neither for processing nor other usage).
335
        If given, do parse and validate :py:data:`.parameter`.
336
337
        Args:
338
             workspace (:py:class:`~ocrd.Workspace`): The workspace to process. \
339
                 If not ``None``, then `chdir` to that directory.
340
                 Deprecated since version 3.0: Should be ``None`` here, but then needs to be set \
341
                 before processing.
342
        Keyword Args:
343
             parameter (string): JSON of the runtime choices for ocrd-tool ``parameters``. \
344
                 Can be ``None`` even for processing, but then needs to be set before running.
345
             input_file_grp (string): comma-separated list of METS ``fileGrp`` used for input. \
346
                 Deprecated since version 3.0: Should be ``None`` here, but then needs to be set \
347
                 before processing.
348
             output_file_grp (string): comma-separated list of METS ``fileGrp`` used for output. \
349
                 Deprecated since version 3.0: Should be ``None`` here, but then needs to be set \
350
                 before processing.
351
             page_id (string): comma-separated list of METS physical ``page`` IDs to process \
352
                 (or empty for all pages). \
353
                 Deprecated since version 3.0: Should be ``None`` here, but then needs to be set \
354
                 before processing.
355
        """
356
        if ocrd_tool is not None:
357
            deprecation_warning("Passing 'ocrd_tool' as keyword argument to Processor is deprecated - "
358
                                "use or override metadata/executable/ocrd-tool properties instead")
359
            self.ocrd_tool = ocrd_tool
360
            self.executable = ocrd_tool['executable']
361
        if version is not None:
362
            deprecation_warning("Passing 'version' as keyword argument to Processor is deprecated - "
363
                                "use or override metadata/version properties instead")
364
            self.version = version
365
        if workspace is not None:
366
            deprecation_warning("Passing a workspace argument other than 'None' to Processor "
367
                                "is deprecated - pass as argument to process_workspace instead")
368
            self.workspace = workspace
369
            self.old_pwd = getcwd()
370
            os.chdir(self.workspace.directory)
371
        if input_file_grp is not None:
372
            deprecation_warning("Passing an input_file_grp kwarg other than 'None' to Processor "
373
                                "is deprecated - pass as argument to process_workspace instead")
374
            self.input_file_grp = input_file_grp
375
        if output_file_grp is not None:
376
            deprecation_warning("Passing an output_file_grp kwarg other than 'None' to Processor "
377
                                "is deprecated - pass as argument to process_workspace instead")
378
            self.output_file_grp = output_file_grp
379
        if page_id is not None:
380
            deprecation_warning("Passing a page_id kwarg other than 'None' to Processor "
381
                                "is deprecated - pass as argument to process_workspace instead")
382
            self.page_id = page_id or None
383
        self.download = config.OCRD_DOWNLOAD_INPUT
384
        #: The logger to be used by processor implementations.
385
        # `ocrd.processor.base` internals should use :py:attr:`self._base_logger`
386
        self.logger = getLogger(f'ocrd.processor.{self.__class__.__name__}')
387
        self._base_logger = getLogger('ocrd.processor.base')
388
        if parameter is not None:
389
            self.parameter = parameter
390
        # workaround for deprecated#72 (@deprecated decorator does not work for subclasses):
391
        setattr(self, 'process', deprecated(
392
            version='3.0', reason='process() should be replaced '
393
            'with process_page_pcgts() or process_page_file() or process_workspace()')(
394
                getattr(self, 'process')))
395
396
    def __del__(self):
397
        self._base_logger.debug("shutting down %s in %s", repr(self), mp.current_process().name)
398
        self.shutdown()
399
400
    def show_help(self, subcommand=None):
401
        """
402
        Print a usage description including the standard CLI and all of this processor's ocrd-tool
403
        parameters and docstrings.
404
        """
405
        print(generate_processor_help(self.ocrd_tool, processor_instance=self, subcommand=subcommand))
406
407
    def show_version(self):
408
        """
409
        Print information on this processor's version and OCR-D version.
410
        """
411
        print("Version %s, ocrd/core %s" % (self.version, OCRD_VERSION))
412
413
    def verify(self):
414
        """
415
        Verify that :py:attr:`input_file_grp` and :py:attr:`output_file_grp` fulfill the processor's requirements.
416
        """
417
        # verify input and output file groups in parameters
418
        assert self.input_file_grp is not None
419
        assert self.output_file_grp is not None
420
        input_file_grps = self.input_file_grp.split(',')
421
        output_file_grps = self.output_file_grp.split(',')
422
423
        def assert_file_grp_cardinality(grps: List[str], spec: Union[int, List[int]], msg):
424
            if isinstance(spec, int):
425
                if spec > 0:
426
                    assert len(grps) == spec, msg % (len(grps), str(spec))
427
            else:
428
                assert isinstance(spec, list)
429
                minimum = spec[0]
430
                maximum = spec[1]
431
                if minimum > 0:
432
                    assert len(grps) >= minimum, msg % (len(grps), str(spec))
433
                if maximum > 0:
434
                    assert len(grps) <= maximum, msg % (len(grps), str(spec))
435
        # FIXME: enforce unconditionally as soon as grace period for deprecation is over
436
        if 'input_file_grp_cardinality' in self.ocrd_tool:
437
            assert_file_grp_cardinality(input_file_grps, self.ocrd_tool['input_file_grp_cardinality'],
438
                                        "Unexpected number of input file groups %d vs %s")
439
        if 'output_file_grp_cardinality' in self.ocrd_tool:
440
            assert_file_grp_cardinality(output_file_grps, self.ocrd_tool['output_file_grp_cardinality'],
441
                                        "Unexpected number of output file groups %d vs %s")
442
        # verify input and output file groups in METS
443
        for input_file_grp in input_file_grps:
444
            assert input_file_grp in self.workspace.mets.file_groups, \
445
                f"input fileGrp {input_file_grp} does not exist in workspace {self.workspace}"
446
        for output_file_grp in output_file_grps:
447
            assert (output_file_grp not in self.workspace.mets.file_groups
448
                    or config.OCRD_EXISTING_OUTPUT in ['OVERWRITE', 'SKIP']
449
                    or not any(self.workspace.mets.find_files(
450
                        pageId=self.page_id, fileGrp=output_file_grp))), \
451
                    f"output fileGrp {output_file_grp} already exists in workspace {self.workspace}"
452
        # keep this for backwards compatibility:
453
        return True
454
455
    def dump_json(self):
456
        """
457
        Print :py:attr:`ocrd_tool` on stdout.
458
        """
459
        print(json.dumps(self.ocrd_tool, indent=True))
460
461
    def dump_module_dir(self):
462
        """
463
        Print :py:attr:`moduledir` on stdout.
464
        """
465
        print(self.moduledir)
466
467
    def list_resources(self):
468
        """
469
        Find all installed resource files in the search paths and print their path names.
470
        """
471
        for res in self.list_all_resources():
472
            print(res)
473
474
    def setup(self) -> None:
475
        """
476
        Prepare the processor for actual data processing,
477
        prior to changing to the workspace directory but
478
        after parsing parameters.
479
480
        (Override this to load models into memory etc.)
481
        """
482
        pass
483
484
    def shutdown(self) -> None:
485
        """
486
        Bring down the processor after data processing,
487
        after to changing back from the workspace directory but
488
        before exiting (or setting up with different parameters).
489
490
        (Override this to unload models from memory etc.)
491
        """
492
        pass
493
494
    @deprecated(version='3.0', reason='process() should be replaced '
495
                'with process_page_pcgts() or process_page_file() or process_workspace()')
496
    def process(self) -> None:
497
        """
498
        Process all files of the :py:data:`workspace`
499
        from the given :py:data:`input_file_grp`
500
        to the given :py:data:`output_file_grp`
501
        for the given :py:data:`page_id` (or all pages)
502
        under the given :py:data:`parameter`.
503
504
        (This contains the main functionality and needs to be
505
        overridden by subclasses.)
506
        """
507
        raise IncompleteProcessorImplementation()
508
509
    def process_workspace(self, workspace: Workspace) -> None:
510
        """
511
        Process all files of the given ``workspace``,
512
        from the given :py:data:`input_file_grp`
513
        to the given :py:data:`output_file_grp`
514
        for the given :py:data:`page_id` (or all pages)
515
        under the given :py:data:`parameter`.
516
517
        Delegates to :py:meth:`.process_workspace_submit_tasks`
518
        and :py:meth:`.process_workspace_handle_tasks`.
519
520
        (This will iterate over pages and files, calling
521
        :py:meth:`.process_page_file` and handling exceptions.
522
        It should be overridden by subclasses to handle cases
523
        like post-processing or computation across pages.)
524
        """
525
        with pushd_popd(workspace.directory):
526
            self.workspace = workspace
527
            self.verify()
528
            try:
529
                # set up multitasking
530
                max_workers = max(0, config.OCRD_MAX_PARALLEL_PAGES)
531
                if self.max_workers > 0 and self.max_workers < config.OCRD_MAX_PARALLEL_PAGES:
532
                    self._base_logger.info("limiting number of workers from %d to %d", max_workers, self.max_workers)
533
                    max_workers = self.max_workers
534
                if max_workers > 1:
535
                    assert isinstance(workspace.mets, ClientSideOcrdMets), \
536
                        "OCRD_MAX_PARALLEL_PAGES>1 requires also using --mets-server-url"
537
                max_seconds = max(0, config.OCRD_PROCESSING_PAGE_TIMEOUT)
538
                if self.max_page_seconds > 0 and self.max_page_seconds < config.OCRD_PROCESSING_PAGE_TIMEOUT:
539
                    self._base_logger.info("limiting page timeout from %d to %d sec", max_seconds, self.max_page_seconds)
540
                    max_seconds = self.max_page_seconds
541
542
                if isinstance(workspace.mets, ClientSideOcrdMets):
543
                    executor_cls = ProcessPoolExecutor
544
                    log_queue = mp.get_context('fork').Queue()
545
                else:
546
                    executor_cls = DummyExecutor
547
                    log_queue = None
548
                executor = executor_cls(
549
                    max_workers=max_workers or 1,
550
                    # only forking method avoids pickling
551
                    context=mp.get_context('fork'),
552
                    # share processor instance as global to avoid pickling
553
                    initializer=_page_worker_set_ctxt,
554
                    initargs=(self, log_queue),
555
                )
556
                if isinstance(workspace.mets, ClientSideOcrdMets):
557
                    assert executor.active # ensure pre-forking
558
                    # forward messages from log queue (in subprocesses) to all root handlers
559
                    log_listener = logging.handlers.QueueListener(log_queue, *logging.root.handlers,
560
                                                                  respect_handler_level=True)
561
                    log_listener.start()
562
                tasks = None
563
                try:
564
                    self._base_logger.debug("started executor %s with %d workers", str(executor), max_workers or 1)
565
                    tasks = self.process_workspace_submit_tasks(executor, max_seconds)
566
                    stats = self.process_workspace_handle_tasks(tasks)
567
                finally:
568
                    executor.stop()
569
                    executor.join(timeout=3.0) # raises TimeoutError
570
                    self._base_logger.debug("stopped executor %s after %d tasks", str(executor), len(tasks) if tasks else -1)
571
                    if isinstance(workspace.mets, ClientSideOcrdMets):
572
                        # can cause deadlock:
573
                        #log_listener.stop()
574
                        # not much better:
575
                        #log_listener.enqueue_sentinel()
576
                        pass
577
578
            except IncompleteProcessorImplementation:
579
                # fall back to deprecated method
580
                try:
581
                    self.process()
582
                except Exception as err:
583
                    # suppress the IncompleteProcessorImplementation context
584
                    raise err from None
585
586
    def process_workspace_submit_tasks(self, executor: TExecutor, max_seconds: int) -> Dict[
587
            TFuture, Tuple[str, List[Optional[OcrdFileType]]]]:
588
        """
589
        Look up all input files of the given ``workspace``
590
        from the given :py:data:`input_file_grp`
591
        for the given :py:data:`page_id` (or all pages),
592
        and schedules calling :py:meth:`.process_page_file`
593
        on them for each page via `executor` (enforcing
594
        a per-page time limit of `max_seconds`).
595
596
        When running with `OCRD_MAX_PARALLEL_PAGES>1` and
597
        the workspace via METS Server, the executor will fork
598
        this many worker parallel subprocesses each processing
599
        one page at a time. (Interprocess communication is
600
        done via task and result queues.)
601
602
        Otherwise, tasks are run sequentially in the
603
        current process.
604
605
        Delegates to :py:meth:`.zip_input_files` to get
606
        the input files for each page, and then calls
607
        :py:meth:`.process_workspace_submit_page_task`.
608
609
        Returns a dict mapping the per-page tasks
610
        (i.e. futures submitted to the executor)
611
        to their corresponding pageId and input files.
612
        """
613
        tasks = {}
614
        for input_file_tuple in self.zip_input_files(on_error='abort', require_first=False):
615
            task, page_id, input_files = self.process_workspace_submit_page_task(executor, max_seconds, input_file_tuple)
616
            tasks[task] = (page_id, input_files)
617
        self._base_logger.debug("submitted %d processing tasks", len(tasks))
618
        return tasks
619
620
    def process_workspace_submit_page_task(self, executor: TExecutor, max_seconds: int,
621
                                           input_file_tuple: List[Optional[OcrdFileType]]) -> Tuple[
622
                                               TFuture, str, List[Optional[OcrdFileType]]]:
623
        """
624
        Ensure all input files for a single page are
625
        downloaded to the workspace, then schedule
626
        :py:meth:`.process_page_file` to be run on
627
        them via `executor` (enforcing a per-page time
628
        limit of `max_seconds`).
629
630
        Delegates to :py:meth:`.process_page_file`
631
        (wrapped in :py:func:`_page_worker` to share
632
        the processor instance across forked processes).
633
634
        \b
635
        Returns a tuple of:
636
        - the scheduled future object,
637
        - the corresponding pageId,
638
        - the corresponding input files.
639
        """
640
        input_files: List[Optional[OcrdFileType]] = [None] * len(input_file_tuple)
641
        page_id = next(input_file.pageId
642
                       for input_file in input_file_tuple
643
                       if input_file)
644
        self._base_logger.info(f"preparing page {page_id}")
645
        for i, input_file in enumerate(input_file_tuple):
646
            if input_file is None:
647
                # file/page not found in this file grp
648
                continue
649
            input_files[i] = input_file
650
            if not self.download:
651
                continue
652
            try:
653
                input_files[i] = self.workspace.download_file(input_file)
654
            except (ValueError, FileNotFoundError, HTTPError) as e:
655
                self._base_logger.error(repr(e))
656
                self._base_logger.warning(f"failed downloading file {input_file} for page {page_id}")
657
        # process page
658
        #executor.submit(self.process_page_file, *input_files)
659
        return executor.schedule(_page_worker, args=input_files, timeout=max_seconds), page_id, input_files
660
661
    def process_workspace_handle_tasks(self, tasks: Dict[TFuture, Tuple[str, List[Optional[OcrdFileType]]]]) -> Tuple[
662
            int, int, Dict[str, int], int]:
663
        """
664
        Look up scheduled per-page futures one by one,
665
        handle errors (exceptions) and gather results.
666
667
        \b
668
        Enforces policies configured by the following
669
        environment variables:
670
        - `OCRD_EXISTING_OUTPUT` (abort/skip/overwrite)
671
        - `OCRD_MISSING_OUTPUT` (abort/skip/fallback-copy)
672
        - `OCRD_MAX_MISSING_OUTPUTS` (abort after all).
673
674
        \b
675
        Returns a tuple of:
676
        - the number of successfully processed pages
677
        - the number of failed (i.e. skipped or copied) pages
678
        - a dict of the type and corresponding number of exceptions seen
679
        - the number of total requested pages (i.e. success+fail+existing).
680
681
        Delegates to :py:meth:`.process_workspace_handle_page_task`
682
        for each page.
683
        """
684
        # aggregate info for logging:
685
        nr_succeeded = 0
686
        nr_failed = 0
687
        nr_errors = defaultdict(int)  # count causes
688
        if config.OCRD_MISSING_OUTPUT == 'SKIP':
689
            reason = "skipped"
690
        elif config.OCRD_MISSING_OUTPUT == 'COPY':
691
            reason = "fallback-copied"
692
        for task in tasks:
693
            # wait for results, handle errors
694
            page_id, input_files = tasks[task]
695
            result = self.process_workspace_handle_page_task(page_id, input_files, task)
696
            if isinstance(result, Exception):
697
                nr_errors[result.__class__.__name__] += 1
698
                nr_failed += 1
699
                # FIXME: this is just prospective, because len(tasks)==nr_failed+nr_succeeded is not guaranteed
700
                if config.OCRD_MAX_MISSING_OUTPUTS > 0 and nr_failed / len(tasks) > config.OCRD_MAX_MISSING_OUTPUTS:
701
                    # already irredeemably many failures, stop short
702
                    nr_errors = dict(nr_errors)
703
                    raise Exception(f"too many failures with {reason} output ({nr_failed} of {nr_failed+nr_succeeded}, "
704
                                    f"{str(nr_errors)})")
705
            elif result:
706
                nr_succeeded += 1
707
            # else skipped - already exists
708
        nr_errors = dict(nr_errors)
709
        nr_all = nr_succeeded + nr_failed
710
        if nr_failed > 0:
711
            if config.OCRD_MAX_MISSING_OUTPUTS > 0 and nr_failed / nr_all > config.OCRD_MAX_MISSING_OUTPUTS:
712
                raise Exception(f"too many failures with {reason} output ({nr_failed} of {nr_all}, {str(nr_errors)})")
713
            self._base_logger.warning("%s %d of %d pages due to %s", reason, nr_failed, nr_all, str(nr_errors))
714
        self._base_logger.debug("succeeded %d, missed %d of %d pages due to %s",
715
                                nr_succeeded, nr_failed, nr_all, str(nr_errors))
716
        return nr_succeeded, nr_failed, nr_errors, len(tasks)
717
718
    def process_workspace_handle_page_task(self, page_id: str, input_files: List[Optional[OcrdFileType]],
719
                                           task: TFuture) -> Union[bool, Exception]:
720
        """
721
        \b
722
        Await a single page result and handle errors (exceptions),
723
        enforcing policies configured by the following
724
        environment variables:
725
        - `OCRD_EXISTING_OUTPUT` (abort/skip/overwrite)
726
        - `OCRD_MISSING_OUTPUT` (abort/skip/fallback-copy)
727
        - `OCRD_MAX_MISSING_OUTPUTS` (abort after all).
728
729
        \b
730
        Returns
731
        - true in case of success
732
        - false in case the output already exists
733
        - the exception in case of failure
734
        """
735
        # FIXME: differentiate error cases in various ways:
736
        # - ResourceNotFoundError → use ResourceManager to download (once), then retry
737
        # - transient (I/O or OOM) error → maybe sleep, retry
738
        # - persistent (data) error → skip / dummy / raise
739
        try:
740
            self._base_logger.debug("waiting for output of task %s (page %s)", task, page_id)
741
            # timeout kwarg on future is useless: it only raises TimeoutError here,
742
            # but does not stop the running process/thread, and executor itself
743
            # offers nothing to that effect:
744
            # task.result(timeout=max_seconds or None)
745
            # so we instead passed the timeout to the submit (schedule) function
746
            task.result()
747
            self._base_logger.debug("page worker completed for page %s", page_id)
748
            return True
749
        except IncompleteProcessorImplementation:
750
            # pass this through, so we can try process() below
751
            raise
752
        # handle input failures separately
753
        except FileExistsError as err:
754
            if config.OCRD_EXISTING_OUTPUT == 'ABORT':
755
                raise err
756
            if config.OCRD_EXISTING_OUTPUT == 'SKIP':
757
                return False
758
            if config.OCRD_EXISTING_OUTPUT == 'OVERWRITE':
759
                # too late here, must not happen
760
                raise Exception(f"got {err} despite OCRD_EXISTING_OUTPUT==OVERWRITE")
761
        except KeyboardInterrupt:
762
            raise
763
        # broad coverage of output failures (including TimeoutError)
764
        except (Exception, TimeoutError) as err:
765
            if isinstance(err, TimeoutError):
766
                self._base_logger.debug("page worker timed out for page %s", page_id)
767
            # FIXME: add re-usable/actionable logging
768
            if config.OCRD_MISSING_OUTPUT == 'ABORT':
769
                self._base_logger.error(f"Failure on page {page_id}: {str(err) or err.__class__.__name__}")
770
                raise err
771
            self._base_logger.exception(f"Failure on page {page_id}: {str(err) or err.__class__.__name__}")
772
            if config.OCRD_MISSING_OUTPUT == 'SKIP':
773
                pass
774
            elif config.OCRD_MISSING_OUTPUT == 'COPY':
775
                self._copy_page_file(input_files[0])
776
            else:
777
                desc = config.describe('OCRD_MISSING_OUTPUT', wrap_text=False, indent_text=False)
778
                raise ValueError(f"unknown configuration value {config.OCRD_MISSING_OUTPUT} - {desc}")
779
            return err
780
781
    def _copy_page_file(self, input_file: OcrdFileType) -> None:
782
        """
783
        Copy the given ``input_file`` of the :py:data:`workspace`,
784
        representing one physical page (passed as one opened
785
        :py:class:`~ocrd_models.OcrdFile` per input fileGrp)
786
        and add it as if it was a processing result.
787
        """
788
        input_pcgts: OcrdPage
789
        assert isinstance(input_file, get_args(OcrdFileType))
790
        self._base_logger.debug(f"parsing file {input_file.ID} for page {input_file.pageId}")
791
        try:
792
            input_pcgts = page_from_file(input_file)
793
        except ValueError as err:
794
            # not PAGE and not an image to generate PAGE for
795
            self._base_logger.error(f"non-PAGE input for page {input_file.pageId}: {err}")
796
            return
797
        output_file_id = make_file_id(input_file, self.output_file_grp)
798
        input_pcgts.set_pcGtsId(output_file_id)
799
        self.add_metadata(input_pcgts)
800
        self.workspace.add_file(
801
            file_id=output_file_id,
802
            file_grp=self.output_file_grp,
803
            page_id=input_file.pageId,
804
            local_filename=os.path.join(self.output_file_grp, output_file_id + '.xml'),
805
            mimetype=MIMETYPE_PAGE,
806
            content=to_xml(input_pcgts),
807
        )
808
809
    def process_page_file(self, *input_files: Optional[OcrdFileType]) -> None:
810
        """
811
        Process the given ``input_files`` of the :py:data:`workspace`,
812
        representing one physical page (passed as one opened
813
        :py:class:`.OcrdFile` per input fileGrp)
814
        under the given :py:data:`.parameter`, and make sure the
815
        results get added accordingly.
816
817
        (This uses :py:meth:`.process_page_pcgts`, but should be overridden by subclasses
818
        to handle cases like multiple output fileGrps, non-PAGE input etc.)
819
        """
820
        input_pcgts: List[Optional[OcrdPage]] = [None] * len(input_files)
821
        input_pos = next(i for i, input_file in enumerate(input_files) if input_file is not None)
822
        page_id = input_files[input_pos].pageId
823
        self._base_logger.info("processing page %s", page_id)
824
        for i, input_file in enumerate(input_files):
825
            grp = self.input_file_grp.split(',')[i]
826
            if input_file is None:
827
                self._base_logger.debug(f"ignoring missing file for input fileGrp {grp} for page {page_id}")
828
                continue
829
            assert isinstance(input_file, get_args(OcrdFileType))
830
            if not input_file.local_filename:
831
                self._base_logger.error(f'No local file exists for page {page_id} in file group {grp}')
832
                if config.OCRD_MISSING_INPUT == 'ABORT':
833
                    raise MissingInputFile(grp, page_id, input_file.mimetype)
834
                continue
835
            self._base_logger.debug(f"parsing file {input_file.ID} for page {page_id}")
836
            try:
837
                page_ = page_from_file(input_file)
838
                assert isinstance(page_, OcrdPage)
839
                input_pcgts[i] = page_
840
            except ValueError as err:
841
                # not PAGE and not an image to generate PAGE for
842
                self._base_logger.error(f"non-PAGE input for page {page_id}: {err}")
843
        if not any(input_pcgts):
844
            self._base_logger.warning(f'skipping page {page_id}')
845
            return
846
        output_file_grps = self.output_file_grp.split(',')
847
        output_file_ids = [make_file_id(input_files[input_pos], output_file_grp)
848
                           if input_files[input_pos].fileGrp != output_file_grp else
849
                           # input=output fileGrp: re-use ID exactly
850
                           input_files[input_pos].ID
851
                           for output_file_grp in output_file_grps]
852
        if config.OCRD_EXISTING_OUTPUT != 'OVERWRITE':
853
            for output_file_id in output_file_ids:
854
                if output_file := next(self.workspace.mets.find_files(ID=output_file_id), None):
855
                    # short-cut avoiding useless computation:
856
                    raise FileExistsError(
857
                        f"A file with ID=={output_file_id} already exists {output_file}"
858
                        " and OCRD_EXISTING_OUTPUT != OVERWRITE"
859
                    )
860
        results = self.process_page_pcgts(*input_pcgts, page_id=page_id)
861
        if len(results) > len(output_file_grps):
862
            self._base_logger.error(f"processor returned {len(results) - len(output_file_grps)} "
863
                                    f"more results than specified output fileGrps for page {page_id}")
864
        for result, output_file_id, output_file_grp in zip(results, output_file_ids, output_file_grps):
865
            for image_result in result.images:
866
                image_file_id = f'{output_file_id}_{image_result.file_id_suffix}'
867
                image_file_path = join(output_file_grp, f'{image_file_id}.png')
868
                if isinstance(image_result.alternative_image, PageType):
869
                    # special case: not an alternative image, but replacing the original image
870
                    # (this is needed by certain processors when the original's coordinate system
871
                    #  cannot or must not be kept, e.g. dewarping)
872
                    image_result.alternative_image.set_imageFilename(image_file_path)
873
                    image_result.alternative_image.set_imageWidth(image_result.pil.width)
874
                    image_result.alternative_image.set_imageHeight(image_result.pil.height)
875
                elif isinstance(image_result.alternative_image, AlternativeImageType):
876
                    image_result.alternative_image.set_filename(image_file_path)
877
                elif image_result.alternative_image is None:
878
                    pass  # do not reference in PAGE result
879
                else:
880
                    raise ValueError(f"process_page_pcgts returned an OcrdPageResultImage of unknown type "
881
                                     f"{type(image_result.alternative_image)}")
882
                self.workspace.save_image_file(
883
                    image_result.pil,
884
                    image_file_id,
885
                    output_file_grp,
886
                    page_id=page_id,
887
                    file_path=image_file_path,
888
                )
889
            result.pcgts.set_pcGtsId(output_file_id)
890
            self.add_metadata(result.pcgts)
891
            self.workspace.add_file(
892
                file_id=output_file_id,
893
                file_grp=output_file_grp,
894
                page_id=page_id,
895
                local_filename=os.path.join(output_file_grp, output_file_id + '.xml'),
896
                mimetype=MIMETYPE_PAGE,
897
                content=to_xml(result.pcgts),
898
            )
899
900
    def process_page_pcgts(self, *input_pcgts: Optional[OcrdPage], page_id: Optional[str] = None) -> OcrdPageResult:
901
        """
902
        Process the given ``input_pcgts`` of the :py:data:`.workspace`,
903
        representing one physical page (passed as one parsed
904
        :py:class:`.OcrdPage` per input fileGrp)
905
        under the given :py:data:`.parameter`, and return the
906
        resulting :py:class:`.OcrdPageResult`.
907
908
        Optionally, add to the ``images`` attribute of the resulting
909
        :py:class:`.OcrdPageResult` instances of :py:class:`.OcrdPageResultImage`,
910
        which have required fields for ``pil`` (:py:class:`PIL.Image` image data),
911
        ``file_id_suffix`` (used for generating IDs of the saved image) and
912
        ``alternative_image`` (reference of the :py:class:`ocrd_models.ocrd_page.AlternativeImageType`
913
        for setting the filename of the saved image).
914
915
        (This contains the main functionality and must be overridden by subclasses,
916
        unless it does not get called by some overriden :py:meth:`.process_page_file`.)
917
        """
918
        raise IncompleteProcessorImplementation()
919
920
    def add_metadata(self, pcgts: OcrdPage) -> None:
921
        """
922
        Add PAGE-XML :py:class:`~ocrd_models.ocrd_page.MetadataItemType` ``MetadataItem`` describing
923
        the processing step and runtime parameters to :py:class:`.OcrdPage` ``pcgts``.
924
        """
925
        metadata_obj = pcgts.get_Metadata()
926
        assert metadata_obj is not None
927
        metadata_item = MetadataItemType(
928
            type_="processingStep",
929
            name=self.ocrd_tool['steps'][0],
930
            value=self.ocrd_tool['executable'],
931
            Labels=[LabelsType(
932
                externalModel="ocrd-tool",
933
                externalId="parameters",
934
                Label=[LabelType(type_=name,
935
                                 value=self.parameter[name])
936
                       for name in self.parameter.keys()]),
937
                    LabelsType(
938
                        externalModel="ocrd-tool",
939
                        externalId="version",
940
                        Label=[LabelType(type_=self.ocrd_tool['executable'],
941
                                         value=self.version),
942
                               LabelType(type_='ocrd/core',
943
                                         value=OCRD_VERSION)])
944
            ])
945
        metadata_obj.add_MetadataItem(metadata_item)
946
947
    def resolve_resource(self, val):
948
        """
949
        Resolve a resource name to an absolute file path with the algorithm in
950
        `spec <https://ocr-d.de/en/spec/ocrd_tool#file-parameters>`_
951
952
        Args:
953
            val (string): resource value to resolve
954
        """
955
        executable = self.ocrd_tool['executable']
956
        if exists(val):
957
            self._base_logger.debug("Resolved to absolute path %s" % val)
958
            return val
959
        # FIXME: remove once workspace arg / old_pwd is gone:
960
        if hasattr(self, 'old_pwd'):
961
            cwd = self.old_pwd
962
        else:
963
            cwd = getcwd()
964
        ret = list(filter(exists, list_resource_candidates(executable, val,
965
                                                           cwd=cwd, moduled=self.moduledir)))
966
        if ret:
967
            self._base_logger.debug("Resolved %s to absolute path %s" % (val, ret[0]))
968
            return ret[0]
969
        raise ResourceNotFoundError(val, executable)
970
971
    def show_resource(self, val):
972
        """
973
        Resolve a resource name to a file path with the algorithm in
974
        `spec <https://ocr-d.de/en/spec/ocrd_tool#file-parameters>`_,
975
        then print its contents to stdout.
976
977
        Args:
978
            val (string): resource value to show
979
        """
980
        res_fname = self.resolve_resource(val)
981
        fpath = Path(res_fname)
982
        if fpath.is_dir():
983
            with pushd_popd(fpath):
984
                fileobj = io.BytesIO()
985
                with tarfile.open(fileobj=fileobj, mode='w:gz') as tarball:
986
                    tarball.add('.')
987
                fileobj.seek(0)
988
                copyfileobj(fileobj, sys.stdout.buffer)
989
        else:
990
            sys.stdout.buffer.write(fpath.read_bytes())
991
992
    def list_all_resources(self):
993
        """
994
        List all resources found in the filesystem and matching content-type by filename suffix
995
        """
996
        for res in list_all_resources(self.executable, ocrd_tool=self.ocrd_tool, moduled=self.moduledir):
997
            res = Path(res)
998
            yield res.name
999
1000
    @property
1001
    def module(self):
1002
        """
1003
        The top-level module this processor belongs to.
1004
        """
1005
        # find shortest prefix path that is not just a namespace package
1006
        fqname = ''
1007
        for name in self.__module__.split('.'):
1008
            if fqname:
1009
                fqname += '.'
1010
            fqname += name
1011
            if getattr(sys.modules[fqname], '__file__', None):
1012
                return fqname
1013
        # fall-back
1014
        return self.__module__
1015
1016
    @property
1017
    def moduledir(self):
1018
        """
1019
        The filesystem path of the module directory.
1020
        """
1021
        return resource_filename(self.module, '.')
1022
1023
    @property
1024
    def input_files(self):
1025
        """
1026
        List the input files (for single-valued :py:attr:`input_file_grp`).
1027
1028
        For each physical page:
1029
1030
        - If there is a single PAGE-XML for the page, take it (and forget about all
1031
          other files for that page)
1032
        - Else if there is a single image file, take it (and forget about all other
1033
          files for that page)
1034
        - Otherwise raise an error (complaining that only PAGE-XML warrants
1035
          having multiple images for a single page)
1036
1037
        See `algorithm <https://github.com/cisocrgroup/ocrd_cis/pull/57#issuecomment-656336593>`_
1038
1039
        Returns:
1040
            A list of :py:class:`ocrd_models.ocrd_file.OcrdFile` objects.
1041
        """
1042
        if not self.input_file_grp:
1043
            raise ValueError("Processor is missing input fileGrp")
1044
        ret = self.zip_input_files(mimetype=None, on_error='abort')
1045
        if not ret:
1046
            return []
1047
        assert len(ret[0]) == 1, 'Use zip_input_files() instead of input_files when processing multiple input fileGrps'
1048
        return [tuples[0] for tuples in ret]
1049
1050
    def zip_input_files(self, require_first=True, mimetype=None, on_error='skip'):
1051
        """
1052
        List tuples of input files (for multi-valued :py:attr:`input_file_grp`).
1053
1054
        Processors that expect/need multiple input file groups,
1055
        cannot use :py:data:`input_files`. They must align (zip) input files
1056
        across pages. This includes the case where not all pages
1057
        are equally present in all file groups. It also requires
1058
        making a consistent selection if there are multiple files
1059
        per page.
1060
1061
        Following the OCR-D functional model, this function tries to
1062
        find a single PAGE file per page, or fall back to a single
1063
        image file per page. In either case, multiple matches per page
1064
        are an error (see error handling below).
1065
        This default behaviour can be changed by using a fixed MIME
1066
        type filter via :py:attr:`mimetype`. But still, multiple matching
1067
        files per page are an error.
1068
1069
        Single-page multiple-file errors are handled according to
1070
        :py:attr:`on_error`:
1071
1072
        - if ``skip``, then the page for the respective fileGrp will be
1073
          silently skipped (as if there was no match at all)
1074
        - if ``first``, then the first matching file for the page will be
1075
          silently selected (as if the first was the only match)
1076
        - if ``last``, then the last matching file for the page will be
1077
          silently selected (as if the last was the only match)
1078
        - if ``abort``, then an exception will be raised.
1079
1080
        Multiple matches for PAGE-XML will always raise an exception.
1081
1082
        Keyword Args:
1083
             require_first (boolean): If true, then skip a page entirely
1084
                 whenever it is not available in the first input `fileGrp`.
1085
             on_error (string): How to handle multiple file matches per page.
1086
             mimetype (string): If not `None`, filter by the specified MIME
1087
                 type (literal or regex prefixed by `//`). Otherwise prefer
1088
                 PAGE or image.
1089
        Returns:
1090
            A list of :py:class:`ocrd_models.ocrd_file.OcrdFile` tuples.
1091
        """
1092
        if not self.input_file_grp:
1093
            raise ValueError("Processor is missing input fileGrp")
1094
1095
        ifgs = self.input_file_grp.split(",")
1096
        # Iterating over all files repeatedly may seem inefficient at first sight,
1097
        # but the unnecessary OcrdFile instantiations for posterior fileGrp filtering
1098
        # can actually be much more costly than traversing the ltree.
1099
        # This might depend on the number of pages vs number of fileGrps.
1100
1101
        pages = {}
1102
        for i, ifg in enumerate(ifgs):
1103
            files_ = sorted(self.workspace.mets.find_all_files(
1104
                    pageId=self.page_id, fileGrp=ifg, mimetype=mimetype),
1105
                                # sort by MIME type so PAGE comes before images
1106
                                key=lambda file_: file_.mimetype)
1107
            for file_ in files_:
1108
                if not file_.pageId:
1109
                    # ignore document-global files
1110
                    continue
1111
                ift = pages.setdefault(file_.pageId, [None]*len(ifgs))
1112
                if ift[i]:
1113
                    self._base_logger.debug(
1114
                        f"another file {file_.ID} for page {file_.pageId} in input file group {ifg}")
1115
                    # fileGrp has multiple files for this page ID
1116
                    if mimetype:
1117
                        # filter was active, this must not happen
1118
                        self._base_logger.warning(
1119
                            f"added file {file_.ID} for page {file_.pageId} in input file group {ifg} "
1120
                            f"conflicts with file {ift[i].ID} of same MIME type {mimetype} - on_error={on_error}")
1121
                        if on_error == 'skip':
1122
                            ift[i] = None
1123
                        elif on_error == 'first':
1124
                            pass  # keep first match
1125
                        elif on_error == 'last':
1126
                            ift[i] = file_
1127
                        elif on_error == 'abort':
1128
                            raise NonUniqueInputFile(ifg, file_.pageId, mimetype)
1129
                        else:
1130
                            raise Exception("Unknown 'on_error' strategy '%s'" % on_error)
1131
                    elif (ift[i].mimetype == MIMETYPE_PAGE and
1132
                          file_.mimetype != MIMETYPE_PAGE):
1133
                        pass  # keep PAGE match
1134
                    elif (ift[i].mimetype == MIMETYPE_PAGE and
1135
                          file_.mimetype == MIMETYPE_PAGE):
1136
                        raise NonUniqueInputFile(ifg, file_.pageId, None)
1137
                    else:
1138
                        # filter was inactive but no PAGE is in control, this must not happen
1139
                        self._base_logger.warning(
1140
                            f"added file {file_.ID} for page {file_.pageId} in input file group {ifg} "
1141
                            f"conflicts with file {ift[i].ID} but no PAGE available - on_error={on_error}")
1142
                        if on_error == 'skip':
1143
                            ift[i] = None
1144
                        elif on_error == 'first':
1145
                            pass  # keep first match
1146
                        elif on_error == 'last':
1147
                            ift[i] = file_
1148
                        elif on_error == 'abort':
1149
                            raise NonUniqueInputFile(ifg, file_.pageId, None)
1150
                        else:
1151
                            raise Exception("Unknown 'on_error' strategy '%s'" % on_error)
1152
                else:
1153
                    self._base_logger.debug(f"adding file {file_.ID} for page {file_.pageId} to input file group {ifg}")
1154
                    ift[i] = file_
1155
        # Warn if no files found but pageId was specified, because that might be due to invalid page_id (range)
1156
        if self.page_id and not any(pages):
1157
            self._base_logger.critical(f"Could not find any files for selected pageId {self.page_id}.\n"
1158
                                       f"compare '{self.page_id}' with the output of 'orcd workspace list-page'.")
1159
        ifts = []
1160
        # use physical page order
1161
        for page in self.workspace.mets.physical_pages:
1162
            if page not in pages:
1163
                continue
1164
            ifiles = pages[page]
1165
            for i, ifg in enumerate(ifgs):
1166
                if not ifiles[i]:
1167
                    # could be from non-unique with on_error=skip or from true gap
1168
                    self._base_logger.error(f'Found no file for page {page} in file group {ifg}')
1169
                    if config.OCRD_MISSING_INPUT == 'ABORT':
1170
                        raise MissingInputFile(ifg, page, mimetype)
1171
            if not any(ifiles):
1172
                # must be from non-unique with on_error=skip
1173
                self._base_logger.warning(f'Found no files for {page} - skipping')
1174
                continue
1175
            if ifiles[0] or not require_first:
1176
                ifts.append(tuple(ifiles))
1177
        return ifts
1178
1179
1180
_page_worker_processor = None
1181
"""
1182
This global binding for the processor is required to avoid
1183
squeezing the processor through a mp.Queue (which is impossible
1184
due to unpicklable attributes like .workspace.mets._tree anyway)
1185
when calling Processor.process_page_file as page worker processes
1186
in Processor.process_workspace. Forking allows inheriting global
1187
objects, and with the METS Server we do not mutate the local
1188
processor instance anyway.
1189
"""
1190
1191
1192
def _page_worker_set_ctxt(processor, log_queue):
1193
    """
1194
    Overwrites `ocrd.processor.base._page_worker_processor` instance
1195
    for sharing with subprocesses in ProcessPoolExecutor initializer.
1196
    """
1197
    global _page_worker_processor
1198
    _page_worker_processor = processor
1199
    if log_queue:
1200
        # replace all log handlers with just one queue handler
1201
        logging.root.handlers = [logging.handlers.QueueHandler(log_queue)]
1202
1203
1204
def _page_worker(*input_files, timeout=0):
1205
    """
1206
    Wraps a `Processor.process_page_file` call as payload (call target)
1207
    of the ProcessPoolExecutor workers.
1208
    """
1209
    #_page_worker_processor.process_page_file(*input_files)
1210
    page_id = next((file.pageId for file in input_files
1211
                    if hasattr(file, 'pageId')), "")
1212
    if timeout:
1213
        if threading.current_thread() is not threading.main_thread():
1214
            # does not work outside of main thread
1215
            # (because the exception/interrupt goes there):
1216
            raise ValueError("cannot apply page worker timeout outside main thread")
1217
        # based on setitimer() / SIGALRM - only available on Unix+Cygwin
1218
        # (but we need to interrupt even when in syscalls
1219
        #  and cannot rely on Timer threads, because
1220
        #  processor implementations might not work with threads),
1221
        alarm.alarm(timeout)
1222
    try:
1223
        _page_worker_processor.process_page_file(*input_files)
1224
        _page_worker_processor.logger.debug("page worker completed for page %s", page_id)
1225
    except alarm.AlarmInterrupt:
1226
        _page_worker_processor.logger.debug("page worker timed out for page %s", page_id)
1227
        raise TimeoutError
1228
    finally:
1229
        if timeout:
1230
            alarm.cancel_alarm()
1231
1232
1233
def generate_processor_help(ocrd_tool, processor_instance=None, subcommand=None):
1234
    """Generate a string describing the full CLI of this processor including params.
1235
1236
    Args:
1237
         ocrd_tool (dict): this processor's ``tools`` section of the module's ``ocrd-tool.json``
1238
         processor_instance (object, optional): the processor implementation
1239
             (for adding any module/class/function docstrings)
1240
        subcommand (string, optional): 'worker'
1241
    """
1242
    doc_help = ''
1243
    if processor_instance:
1244
        module = inspect.getmodule(processor_instance)
1245
        if module and module.__doc__:
1246
            doc_help += '\n' + inspect.cleandoc(module.__doc__) + '\n'
1247
        if processor_instance.__doc__:
1248
            doc_help += '\n' + inspect.cleandoc(processor_instance.__doc__) + '\n'
1249
        # Try to find the most concrete docstring among the various methods that an implementation
1250
        # could overload, first serving.
1251
        # In doing so, compare with Processor to avoid a glitch in the way py>=3.5 inherits docstrings.
1252
        # (They are supposed to only repeat information inspect.getdoc, rather than inherit __doc__ itself.)
1253
        for method in ['process_page_pcgts', 'process_page_file', 'process_workspace', 'process']:
1254
            instance_method = getattr(processor_instance, method)
1255
            superclass_method = getattr(Processor, method)
1256
            if instance_method.__doc__ and instance_method.__doc__ != superclass_method.__doc__:
1257
                doc_help += '\n' + inspect.cleandoc(instance_method.__doc__) + '\n'
1258
                break
1259
        if doc_help:
1260
            doc_help = '\n\n' + wrap_text(doc_help, width=72,
1261
                                          initial_indent='  > ',
1262
                                          subsequent_indent='  > ',
1263
                                          preserve_paragraphs=True)
1264
    subcommands = '''\
1265
    worker      Start a processing worker rather than do local processing
1266
'''
1267
1268
    processing_worker_options = '''\
1269
  --queue                         The RabbitMQ server address in format
1270
                                  "amqp://{user}:{pass}@{host}:{port}/{vhost}"
1271
                                  [amqp://admin:admin@localhost:5672]
1272
  --database                      The MongoDB server address in format
1273
                                  "mongodb://{host}:{port}"
1274
                                  [mongodb://localhost:27018]
1275
  --log-filename                  Filename to redirect STDOUT/STDERR to,
1276
                                  if specified.
1277
'''
1278
1279
    processing_server_options = '''\
1280
  --database                      The MongoDB server address in format
1281
                                  "mongodb://{host}:{port}"
1282
                                  [mongodb://localhost:27018]
1283
'''
1284
1285
    processing_options = '''\
1286
  -m, --mets URL-PATH             URL or file path of METS to process [./mets.xml]
1287
  -w, --working-dir PATH          Working directory of local workspace [dirname(URL-PATH)]
1288
  -I, --input-file-grp USE        File group(s) used as input
1289
  -O, --output-file-grp USE       File group(s) used as output
1290
  -g, --page-id ID                Physical page ID(s) to process instead of full document []
1291
  --overwrite                     Remove existing output pages/images
1292
                                  (with "--page-id", remove only those).
1293
                                  Short-hand for OCRD_EXISTING_OUTPUT=OVERWRITE
1294
  --debug                         Abort on any errors with full stack trace.
1295
                                  Short-hand for OCRD_MISSING_OUTPUT=ABORT
1296
  --profile                       Enable profiling
1297
  --profile-file PROF-PATH        Write cProfile stats to PROF-PATH. Implies "--profile"
1298
  -p, --parameter JSON-PATH       Parameters, either verbatim JSON string
1299
                                  or JSON file path
1300
  -P, --param-override KEY VAL    Override a single JSON object key-value pair,
1301
                                  taking precedence over --parameter
1302
  -U, --mets-server-url URL       URL of a METS Server for parallel incremental access to METS
1303
                                  If URL starts with http:// start an HTTP server there,
1304
                                  otherwise URL is a path to an on-demand-created unix socket
1305
  -l, --log-level [OFF|ERROR|WARN|INFO|DEBUG|TRACE]
1306
                                  Override log level globally [INFO]
1307
  --log-filename LOG-PATH         File to redirect stderr logging to (overriding ocrd_logging.conf).
1308
'''
1309
1310
    information_options = '''\
1311
  -C, --show-resource RESNAME     Dump the content of processor resource RESNAME
1312
  -L, --list-resources            List names of processor resources
1313
  -J, --dump-json                 Dump tool description as JSON
1314
  -D, --dump-module-dir           Show the 'module' resource location path for this processor
1315
  -h, --help                      Show this message
1316
  -V, --version                   Show version
1317
'''
1318
1319
    parameter_help = ''
1320
    if 'parameters' not in ocrd_tool or not ocrd_tool['parameters']:
1321
        parameter_help = '  NONE\n'
1322
    else:
1323
        def wrap(s):
1324
            return wrap_text(s, initial_indent=' ' * 3,
1325
                             subsequent_indent=' ' * 4,
1326
                             width=72, preserve_paragraphs=True)
1327
        for param_name, param in ocrd_tool['parameters'].items():
1328
            parameter_help += wrap('"%s" [%s%s]' % (
1329
                param_name,
1330
                param['type'],
1331
                ' - REQUIRED' if 'required' in param and param['required'] else
1332
                ' - %s' % json.dumps(param['default']) if 'default' in param else ''))
1333
            parameter_help += '\n ' + wrap(param['description'])
1334
            if 'enum' in param:
1335
                parameter_help += '\n ' + wrap('Possible values: %s' % json.dumps(param['enum']))
1336
            parameter_help += "\n"
1337
1338
    if not subcommand:
1339
        return f'''\
1340
Usage: {ocrd_tool['executable']} [worker|server] [OPTIONS]
1341
1342
  {ocrd_tool['description']}{doc_help}
1343
1344
Subcommands:
1345
{subcommands}
1346
Options for processing:
1347
{processing_options}
1348
Options for information:
1349
{information_options}
1350
Parameters:
1351
{parameter_help}
1352
'''
1353
    elif subcommand == 'worker':
1354
        return f'''\
1355
Usage: {ocrd_tool['executable']} worker [OPTIONS]
1356
1357
  Run {ocrd_tool['executable']} as a processing worker.
1358
1359
  {ocrd_tool['description']}{doc_help}
1360
1361
Options:
1362
{processing_worker_options}
1363
'''
1364
    else:
1365
        pass
1366