ocrd.processor.base   F
last analyzed

Complexity

Total Complexity 183

Size/Duplication

Total Lines 1313
Duplicated Lines 1.52 %

Importance

Changes 0
Metric Value
wmc 183
eloc 639
dl 20
loc 1313
rs 1.961
c 0
b 0
f 0

43 Methods

Rating   Name   Duplication   Size   Complexity  
A DummyFuture.result() 0 2 1
A MissingInputFile.__init__() 0 7 1
A NonUniqueInputFile.__init__() 0 7 1
A DummyFuture.__init__() 0 4 1
A DummyExecutor.__init__() 0 2 1
A ResourceNotFoundError.__init__() 0 6 1
A Processor.ocrd_tool() 0 11 1
A DummyExecutor.submit() 0 2 1
A Processor.parameter() 0 6 3
A DummyExecutor.shutdown() 0 3 1
A Processor.metadata_filename() 0 11 1
A Processor.metadata_location() 0 17 3
A Processor.version() 0 10 1
A Processor.executable() 0 12 1
A Processor.metadata_rawdict() 0 10 1
A Processor.metadata() 0 21 2
A Processor.process() 0 13 1
A Processor.show_version() 0 5 1
A Processor.setup() 0 9 1
B Processor.__init__() 0 71 8
C Processor.process_workspace_handle_tasks() 0 53 11
A Processor.__del__() 0 3 1
A Processor.list_resources() 0 6 2
D Processor.process_workspace() 0 73 13
C Processor.verify() 0 40 9
C Processor.process_workspace_handle_page_task() 0 58 11
A Processor.process_workspace_submit_page_task() 0 38 5
A Processor.process_workspace_submit_tasks() 0 32 2
A Processor._copy_page_file() 0 26 2
A Processor.dump_json() 0 5 1
A Processor.shutdown() 0 9 1
A Processor.dump_module_dir() 0 5 1
A Processor.show_help() 0 6 1
F Processor.zip_input_files() 20 121 29
C Processor.process_page_file() 0 73 11
A Processor.show_resource() 0 20 4
B Processor.list_all_resources() 0 15 7
A Processor.resolve_resource() 0 24 4
A Processor.add_metadata() 0 24 1
A Processor.input_files() 0 26 3
A Processor.process_page_pcgts() 0 19 1
A Processor.moduledir() 0 6 1
A Processor.module() 0 15 4

3 Functions

Rating   Name   Duplication   Size   Complexity  
F generate_processor_help() 0 147 18
A _page_worker() 0 19 4
A _page_worker_set_ctxt() 0 10 2

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like ocrd.processor.base often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
Processor base class and helper functions.
3
"""
4
5
__all__ = [
6
    'Processor',
7
    'generate_processor_help',
8
    'run_cli',
9
    'run_processor'
10
]
11
12
from functools import cached_property
13
from os.path import exists, join
14
from shutil import copyfileobj
15
import json
16
import os
17
from os import getcwd
18
from pathlib import Path
19
from typing import Any, Dict, List, Optional, Tuple, Union, get_args
20
import sys
21
import logging
22
import logging.handlers
23
import inspect
24
import tarfile
25
import io
26
from collections import defaultdict
27
from frozendict import frozendict
28
# concurrent.futures is buggy in py38,
29
# this is where the fixes came from:
30
from loky import Future, ProcessPoolExecutor
31
import multiprocessing as mp
32
from threading import Timer
33
from _thread import interrupt_main
34
35
from click import wrap_text
36
from deprecated import deprecated
37
from requests import HTTPError
38
39
from ..workspace import Workspace
40
from ..mets_server import ClientSideOcrdMets
41
from ocrd_models.ocrd_file import OcrdFileType
42
from .ocrd_page_result import OcrdPageResult
43
from ocrd_utils import (
44
    VERSION as OCRD_VERSION,
45
    MIMETYPE_PAGE,
46
    MIME_TO_EXT,
47
    config,
48
    getLogger,
49
    list_resource_candidates,
50
    pushd_popd,
51
    list_all_resources,
52
    get_processor_resource_types,
53
    resource_filename,
54
    parse_json_file_with_comments,
55
    make_file_id,
56
    deprecation_warning
57
)
58
from ocrd_validators import ParameterValidator
59
from ocrd_models.ocrd_page import (
60
    PageType,
61
    AlternativeImageType,
62
    MetadataItemType,
63
    LabelType,
64
    LabelsType,
65
    OcrdPage,
66
    to_xml,
67
)
68
from ocrd_modelfactory import page_from_file
69
from ocrd_validators.ocrd_tool_validator import OcrdToolValidator
70
71
# XXX imports must remain for backwards-compatibility
72
from .helpers import run_cli, run_processor # pylint: disable=unused-import
73
74
75
class ResourceNotFoundError(FileNotFoundError):
76
    """
77
    An exception signifying the requested processor resource
78
    cannot be resolved.
79
    """
80
    def __init__(self, name, executable):
81
        self.name = name
82
        self.executable = executable
83
        self.message = (f"Could not find resource '{name}' for executable '{executable}'. "
84
                        f"Try 'ocrd resmgr download {executable} {name}' to download this resource.")
85
        super().__init__(self.message)
86
87
class NonUniqueInputFile(ValueError):
88
    """
89
    An exception signifying the specified fileGrp / pageId / mimetype
90
    selector yields multiple PAGE files, or no PAGE files but multiple images,
91
    or multiple files of that mimetype.
92
    """
93
    def __init__(self, fileGrp, pageId, mimetype):
94
        self.fileGrp = fileGrp
95
        self.pageId = pageId
96
        self.mimetype = mimetype
97
        self.message = (f"Could not determine unique input file for fileGrp {fileGrp} "
98
                        f"and pageId {pageId} under mimetype {mimetype or 'PAGE+image(s)'}")
99
        super().__init__(self.message)
100
101
class MissingInputFile(ValueError):
102
    """
103
    An exception signifying the specified fileGrp / pageId / mimetype
104
    selector yields no PAGE files, or no PAGE and no image files,
105
    or no files of that mimetype.
106
    """
107
    def __init__(self, fileGrp, pageId, mimetype):
108
        self.fileGrp = fileGrp
109
        self.pageId = pageId
110
        self.mimetype = mimetype
111
        self.message = (f"Could not find input file for fileGrp {fileGrp} "
112
                        f"and pageId {pageId} under mimetype {mimetype or 'PAGE+image(s)'}")
113
        super().__init__(self.message)
114
115
class DummyFuture:
116
    """
117
    Mimics some of `concurrent.futures.Future` but runs immediately.
118
    """
119
    def __init__(self, fn, *args, **kwargs):
120
        self.fn = fn
121
        self.args = args
122
        self.kwargs = kwargs
123
    def result(self):
124
        return self.fn(*self.args, **self.kwargs)
125
class DummyExecutor:
126
    """
127
    Mimics some of `concurrent.futures.ProcessPoolExecutor` but runs
128
    everything immediately in this process.
129
    """
130
    def __init__(self, initializer=None, initargs=(), **kwargs):
131
        initializer(*initargs)
132
    def shutdown(self, **kwargs):
133
        # allow gc to catch processor instance (unless cached)
134
        _page_worker_set_ctxt(None, None)
135
    def submit(self, fn, *args, **kwargs) -> DummyFuture:
136
        return DummyFuture(fn, *args, **kwargs)
137
138
TFuture = Union[DummyFuture, Future]
139
TExecutor = Union[DummyExecutor, ProcessPoolExecutor]
140
141
class Processor():
142
    """
143
    A processor is a tool that implements the uniform OCR-D
144
    `command-line interface for run-time data processing <https://ocr-d.de/en/spec/cli>`_.
145
146
    That is, it executes a single workflow step, or a combination of workflow steps,
147
    on the workspace (represented by local METS). It reads input files for all or selected
148
    physical pages of the input fileGrp(s), computes additional annotation, and writes output
149
    files for them into the output fileGrp(s). It may take a number of optional or mandatory
150
    parameters.
151
    """
152
153
    max_instances : int = -1
154
    """
155
    maximum number of cached instances (ignored if negative), to be applied on top of
156
    :py:data:`~ocrd_utils.config.OCRD_MAX_PROCESSOR_CACHE` (i.e. whatever is smaller).
157
158
    (Override this if you know how many instances fit into memory - GPU / CPU RAM - at once.)
159
    """
160
161
    max_workers : int = -1
162
    """
163
    maximum number of processor forks for page-parallel processing (ignored if negative),
164
    to be applied on top of :py:data:`~ocrd_utils.config.OCRD_MAX_PARALLEL_PAGES` (i.e.
165
    whatever is smaller).
166
167
    (Override this if you know how many pages fit into processing units - GPU shaders / CPU cores
168
    - at once, or if your class already creates threads prior to forking, e.g. during ``setup``.)
169
    """
170
171
    max_page_seconds : int = -1
172
    """
173
    maximum number of seconds may be spent processing a single page (ignored if negative),
174
    to be applied on top of :py:data:`~ocrd_utils.config.OCRD_PROCESSING_PAGE_TIMEOUT`
175
    (i.e. whatever is smaller).
176
177
    (Override this if you know how costly this processor may be, irrespective of image size
178
    or complexity of the page.)
179
    """
180
181
    @property
182
    def metadata_filename(self) -> str:
183
        """
184
        Relative location of the ``ocrd-tool.json`` file inside the package.
185
186
        Used by :py:data:`metadata_location`.
187
188
        (Override if ``ocrd-tool.json`` is not in the root of the module,
189
        e.g. ``namespace/ocrd-tool.json`` or ``data/ocrd-tool.json``).
190
        """
191
        return 'ocrd-tool.json'
192
193
    @cached_property
194
    def metadata_location(self) -> Path:
195
        """
196
        Absolute path of the ``ocrd-tool.json`` file as distributed with the package.
197
198
        Used by :py:data:`metadata_rawdict`.
199
200
        (Override if ``ocrd-tool.json`` is not distributed with the Python package.)
201
        """
202
        module = inspect.getmodule(self)
203
        module_tokens = module.__package__.split('.')
204
        # for namespace packages, we cannot just use the first token
205
        for i in range(len(module_tokens)):
206
            prefix = '.'.join(module_tokens[:i + 1])
207
            if sys.modules[prefix].__spec__.has_location:
208
                return resource_filename(prefix, self.metadata_filename)
209
        raise Exception("cannot find top-level module prefix for %s", module.__package__)
210
211
    @cached_property
212
    def metadata_rawdict(self) -> dict:
213
        """
214
        Raw (unvalidated, unexpanded) ``ocrd-tool.json`` dict contents of the package.
215
216
        Used by :py:data:`metadata`.
217
218
        (Override if ``ocrd-tool.json`` is not in a file.)
219
        """
220
        return parse_json_file_with_comments(self.metadata_location)
221
222
    @cached_property
223
    def metadata(self) -> dict:
224
        """
225
        The ``ocrd-tool.json`` dict contents of the package, according to the OCR-D
226
        `spec <https://ocr-d.de/en/spec/ocrd_tool>`_ for processor tools.
227
228
        After deserialisation, it also gets validated against the
229
        `schema <https://ocr-d.de/en/spec/ocrd_tool#definition>`_ with all defaults
230
        expanded.
231
232
        Used by :py:data:`ocrd_tool` and :py:data:`version`.
233
234
        (Override if you want to provide metadata programmatically instead of a
235
        JSON file.)
236
        """
237
        metadata = self.metadata_rawdict
238
        report = OcrdToolValidator.validate(metadata)
239
        if not report.is_valid:
240
            self.logger.error(f"The ocrd-tool.json of this processor is {'problematic' if not report.errors else 'invalid'}:\n"
241
                              f"{report.to_xml()}.\nPlease open an issue at {metadata.get('git_url', 'the website')}.")
242
        return metadata
243
244
    @cached_property
245
    def version(self) -> str:
246
        """
247
        The program version of the package.
248
        Usually the ``version`` part of :py:data:`metadata`.
249
250
        (Override if you do not want to use :py:data:`metadata` lookup
251
        mechanism.)
252
        """
253
        return self.metadata['version']
254
255
    @cached_property
256
    def executable(self) -> str:
257
        """
258
        The executable name of this processor tool. Taken from the runtime
259
        filename.
260
261
        Used by :py:data:`ocrd_tool` for lookup in :py:data:`metadata`.
262
263
        (Override if your entry-point name deviates from the ``executable``
264
        name, or the processor gets instantiated from another runtime.)
265
        """
266
        return os.path.basename(inspect.stack()[-1].filename)
267
268
    @cached_property
269
    def ocrd_tool(self) -> dict:
270
        """
271
        The ``ocrd-tool.json`` dict contents of this processor tool.
272
        Usually the :py:data:`executable` key of the ``tools`` part
273
        of :py:data:`metadata`.
274
275
        (Override if you do not want to use :py:data:`metadata` lookup
276
        mechanism.)
277
        """
278
        return self.metadata['tools'][self.executable]
279
280
    @property
281
    def parameter(self) -> Optional[dict]:
282
        """the runtime parameter dict to be used by this processor"""
283
        if hasattr(self, '_parameter'):
284
            return self._parameter
285
        return None
286
287
    @parameter.setter
288
    def parameter(self, parameter : dict) -> None:
289
        if self.parameter is not None:
290
            self.shutdown()
291
        parameterValidator = ParameterValidator(self.ocrd_tool)
292
        report = parameterValidator.validate(parameter)
293
        if not report.is_valid:
294
            raise ValueError(f'Invalid parameters:\n{report.to_xml()}')
295
        # make parameter dict read-only
296
        self._parameter = frozendict(parameter)
297
        # (re-)run setup to load models etc
298
        self.setup()
299
300
    def __init__(
301
            self,
302
            # FIXME: remove in favor of process_workspace(workspace)
303
            workspace : Optional[Workspace],
304
            ocrd_tool=None,
305
            parameter=None,
306
            input_file_grp=None,
307
            output_file_grp=None,
308
            page_id=None,
309
            version=None
310
    ):
311
        """
312
        Instantiate, but do not setup (neither for processing nor other usage).
313
        If given, do parse and validate :py:data:`.parameter`.
314
315
        Args:
316
             workspace (:py:class:`~ocrd.Workspace`): The workspace to process. \
317
                 If not ``None``, then `chdir` to that directory.
318
                 Deprecated since version 3.0: Should be ``None`` here, but then needs to be set \
319
                 before processing.
320
        Keyword Args:
321
             parameter (string): JSON of the runtime choices for ocrd-tool ``parameters``. \
322
                 Can be ``None`` even for processing, but then needs to be set before running.
323
             input_file_grp (string): comma-separated list of METS ``fileGrp`` used for input. \
324
                 Deprecated since version 3.0: Should be ``None`` here, but then needs to be set \
325
                 before processing.
326
             output_file_grp (string): comma-separated list of METS ``fileGrp`` used for output. \
327
                 Deprecated since version 3.0: Should be ``None`` here, but then needs to be set \
328
                 before processing.
329
             page_id (string): comma-separated list of METS physical ``page`` IDs to process \
330
                 (or empty for all pages). \
331
                 Deprecated since version 3.0: Should be ``None`` here, but then needs to be set \
332
                 before processing.
333
        """
334
        if ocrd_tool is not None:
335
            deprecation_warning("Passing 'ocrd_tool' as keyword argument to Processor is deprecated - "
336
                                "use or override metadata/executable/ocrd-tool properties instead")
337
            self.ocrd_tool = ocrd_tool
338
            self.executable = ocrd_tool['executable']
339
        if version is not None:
340
            deprecation_warning("Passing 'version' as keyword argument to Processor is deprecated - "
341
                                "use or override metadata/version properties instead")
342
            self.version = version
343
        if workspace is not None:
344
            deprecation_warning("Passing a workspace argument other than 'None' to Processor "
345
                                "is deprecated - pass as argument to process_workspace instead")
346
            self.workspace = workspace
347
            self.old_pwd = getcwd()
348
            os.chdir(self.workspace.directory)
349
        if input_file_grp is not None:
350
            deprecation_warning("Passing an input_file_grp kwarg other than 'None' to Processor "
351
                                "is deprecated - pass as argument to process_workspace instead")
352
            self.input_file_grp = input_file_grp
353
        if output_file_grp is not None:
354
            deprecation_warning("Passing an output_file_grp kwarg other than 'None' to Processor "
355
                                "is deprecated - pass as argument to process_workspace instead")
356
            self.output_file_grp = output_file_grp
357
        if page_id is not None:
358
            deprecation_warning("Passing a page_id kwarg other than 'None' to Processor "
359
                                "is deprecated - pass as argument to process_workspace instead")
360
            self.page_id = page_id or None
361
        self.download = config.OCRD_DOWNLOAD_INPUT
362
        #: The logger to be used by processor implementations.
363
        # `ocrd.processor.base` internals should use :py:attr:`self._base_logger`
364
        self.logger = getLogger(f'ocrd.processor.{self.__class__.__name__}')
365
        self._base_logger = getLogger('ocrd.processor.base')
366
        if parameter is not None:
367
            self.parameter = parameter
368
        # workaround for deprecated#72 (@deprecated decorator does not work for subclasses):
369
        setattr(self, 'process',
370
                deprecated(version='3.0', reason='process() should be replaced with process_page_pcgts() or process_page_file() or process_workspace()')(getattr(self, 'process')))
371
372
    def __del__(self):
373
        self._base_logger.debug("shutting down %s in %s", repr(self), mp.current_process().name)
374
        self.shutdown()
375
376
    def show_help(self, subcommand=None):
377
        """
378
        Print a usage description including the standard CLI and all of this processor's ocrd-tool
379
        parameters and docstrings.
380
        """
381
        print(generate_processor_help(self.ocrd_tool, processor_instance=self, subcommand=subcommand))
382
383
    def show_version(self):
384
        """
385
        Print information on this processor's version and OCR-D version.
386
        """
387
        print("Version %s, ocrd/core %s" % (self.version, OCRD_VERSION))
388
389
    def verify(self):
390
        """
391
        Verify that :py:attr:`input_file_grp` and :py:attr:`output_file_grp` fulfill the processor's requirements.
392
        """
393
        # verify input and output file groups in parameters
394
        assert self.input_file_grp is not None
395
        assert self.output_file_grp is not None
396
        input_file_grps = self.input_file_grp.split(',')
397
        output_file_grps = self.output_file_grp.split(',')
398
        def assert_file_grp_cardinality(grps : List[str], spec : Union[int, List[int]], msg):
399
            if isinstance(spec, int):
400
                if spec > 0:
401
                    assert len(grps) == spec, msg % (len(grps), str(spec))
402
            else:
403
                assert isinstance(spec, list)
404
                minimum = spec[0]
405
                maximum = spec[1]
406
                if minimum > 0:
407
                    assert len(grps) >= minimum, msg % (len(grps), str(spec))
408
                if maximum > 0:
409
                    assert len(grps) <= maximum, msg % (len(grps), str(spec))
410
        # FIXME: enforce unconditionally as soon as grace period for deprecation is over
411
        if 'input_file_grp_cardinality' in self.ocrd_tool:
412
            assert_file_grp_cardinality(input_file_grps, self.ocrd_tool['input_file_grp_cardinality'],
413
                                        "Unexpected number of input file groups %d vs %s")
414
        if 'output_file_grp_cardinality' in self.ocrd_tool:
415
            assert_file_grp_cardinality(output_file_grps, self.ocrd_tool['output_file_grp_cardinality'],
416
                                        "Unexpected number of output file groups %d vs %s")
417
        # verify input and output file groups in METS
418
        for input_file_grp in input_file_grps:
419
            assert input_file_grp in self.workspace.mets.file_groups, \
420
                f"input fileGrp {input_file_grp} does not exist in workspace {self.workspace}"
421
        for output_file_grp in output_file_grps:
422
            assert output_file_grp not in self.workspace.mets.file_groups \
423
                or config.OCRD_EXISTING_OUTPUT in ['OVERWRITE', 'SKIP'] \
424
                or not any(self.workspace.mets.find_files(
425
                    pageId=self.page_id, fileGrp=output_file_grp)), \
426
                    f"output fileGrp {output_file_grp} already exists in workspace {self.workspace}"
427
        # keep this for backwards compatibility:
428
        return True
429
430
    def dump_json(self):
431
        """
432
        Print :py:attr:`ocrd_tool` on stdout.
433
        """
434
        print(json.dumps(self.ocrd_tool, indent=True))
435
436
    def dump_module_dir(self):
437
        """
438
        Print :py:attr:`moduledir` on stdout.
439
        """
440
        print(self.moduledir)
441
442
    def list_resources(self):
443
        """
444
        Find all installed resource files in the search paths and print their path names.
445
        """
446
        for res in self.list_all_resources():
447
            print(res)
448
449
    def setup(self) -> None:
450
        """
451
        Prepare the processor for actual data processing,
452
        prior to changing to the workspace directory but
453
        after parsing parameters.
454
455
        (Override this to load models into memory etc.)
456
        """
457
        pass
458
459
    def shutdown(self) -> None:
460
        """
461
        Bring down the processor after data processing,
462
        after to changing back from the workspace directory but
463
        before exiting (or setting up with different parameters).
464
465
        (Override this to unload models from memory etc.)
466
        """
467
        pass
468
469
    @deprecated(version='3.0', reason='process() should be replaced with process_page_pcgts() or process_page_file() or process_workspace()')
470
    def process(self) -> None:
471
        """
472
        Process all files of the :py:data:`workspace`
473
        from the given :py:data:`input_file_grp`
474
        to the given :py:data:`output_file_grp`
475
        for the given :py:data:`page_id` (or all pages)
476
        under the given :py:data:`parameter`.
477
478
        (This contains the main functionality and needs to be
479
        overridden by subclasses.)
480
        """
481
        raise NotImplementedError()
482
483
    def process_workspace(self, workspace: Workspace) -> None:
484
        """
485
        Process all files of the given ``workspace``,
486
        from the given :py:data:`input_file_grp`
487
        to the given :py:data:`output_file_grp`
488
        for the given :py:data:`page_id` (or all pages)
489
        under the given :py:data:`parameter`.
490
491
        Delegates to :py:meth:`.process_workspace_submit_tasks`
492
        and :py:meth:`.process_workspace_handle_tasks`.
493
494
        (This will iterate over pages and files, calling
495
        :py:meth:`.process_page_file` and handling exceptions.
496
        It should be overridden by subclasses to handle cases
497
        like post-processing or computation across pages.)
498
        """
499
        with pushd_popd(workspace.directory):
500
            self.workspace = workspace
501
            self.verify()
502
            try:
503
                # set up multitasking
504
                max_workers = max(0, config.OCRD_MAX_PARALLEL_PAGES)
505
                if self.max_workers > 0 and self.max_workers < config.OCRD_MAX_PARALLEL_PAGES:
506
                    self._base_logger.info("limiting number of workers from %d to %d", max_workers, self.max_workers)
507
                    max_workers = self.max_workers
508
                if max_workers > 1:
509
                    assert isinstance(workspace.mets, ClientSideOcrdMets), \
510
                        "OCRD_MAX_PARALLEL_PAGES>1 requires also using --mets-server-url"
511
                max_seconds = max(0, config.OCRD_PROCESSING_PAGE_TIMEOUT)
512
                if self.max_page_seconds > 0 and self.max_page_seconds < config.OCRD_PROCESSING_PAGE_TIMEOUT:
513
                    self._base_logger.info("limiting page timeout from %d to %d sec", max_seconds, self.max_page_seconds)
514
                    max_seconds = self.max_page_seconds
515
516
                if max_workers > 1:
517
                    executor_cls = ProcessPoolExecutor
518
                    log_queue = mp.get_context('fork').Queue()
519
                else:
520
                    executor_cls = DummyExecutor
521
                    log_queue = None
522
                executor = executor_cls(
523
                    max_workers=max_workers or 1,
524
                    # only forking method avoids pickling
525
                    context=mp.get_context('fork'),
526
                    # share processor instance as global to avoid pickling
527
                    initializer=_page_worker_set_ctxt,
528
                    initargs=(self, log_queue),
529
                )
530
                if max_workers > 1:
531
                    # forward messages from log queue (in subprocesses) to all root handlers
532
                    log_listener = logging.handlers.QueueListener(log_queue, *logging.root.handlers, respect_handler_level=True)
533
                    log_listener.start()
534
                tasks = None
535
                try:
536
                    self._base_logger.debug("started executor %s with %d workers", str(executor), max_workers or 1)
537
                    tasks = self.process_workspace_submit_tasks(executor, max_seconds)
538
                    stats = self.process_workspace_handle_tasks(tasks)
539
                finally:
540
                    executor.shutdown(kill_workers=True, wait=False)
541
                    self._base_logger.debug("stopped executor %s after %d tasks", str(executor), len(tasks) if tasks else -1)
542
                    if max_workers > 1:
543
                        # can cause deadlock:
544
                        #log_listener.stop()
545
                        # not much better:
546
                        #log_listener.enqueue_sentinel()
547
                        pass
548
549
            except NotImplementedError:
550
                # fall back to deprecated method
551
                try:
552
                    self.process()
553
                except Exception as err:
554
                    # suppress the NotImplementedError context
555
                    raise err from None
556
557
    def process_workspace_submit_tasks(self, executor : TExecutor, max_seconds : int) -> Dict[TFuture, Tuple[str, List[Optional[OcrdFileType]]]]:
558
        """
559
        Look up all input files of the given ``workspace``
560
        from the given :py:data:`input_file_grp`
561
        for the given :py:data:`page_id` (or all pages),
562
        and schedules calling :py:meth:`.process_page_file`
563
        on them for each page via `executor` (enforcing
564
        a per-page time limit of `max_seconds`).
565
566
        When running with `OCRD_MAX_PARALLEL_PAGES>1` and
567
        the workspace via METS Server, the executor will fork
568
        this many worker parallel subprocesses each processing
569
        one page at a time. (Interprocess communication is
570
        done via task and result queues.)
571
572
        Otherwise, tasks are run sequentially in the
573
        current process.
574
575
        Delegates to :py:meth:`.zip_input_files` to get 
576
        the input files for each page, and then calls
577
        :py:meth:`.process_workspace_submit_page_task`.
578
579
        Returns a dict mapping the per-page tasks
580
        (i.e. futures submitted to the executor)
581
        to their corresponding pageId and input files.
582
        """
583
        tasks = {}
584
        for input_file_tuple in self.zip_input_files(on_error='abort', require_first=False):
585
            task, page_id, input_files = self.process_workspace_submit_page_task(executor, max_seconds, input_file_tuple)
586
            tasks[task] = (page_id, input_files)
587
        self._base_logger.debug("submitted %d processing tasks", len(tasks))
588
        return tasks
589
590
    def process_workspace_submit_page_task(self, executor : TExecutor, max_seconds : int, input_file_tuple : List[Optional[OcrdFileType]]) -> Tuple[TFuture, str, List[Optional[OcrdFileType]]]:
591
        """
592
        Ensure all input files for a single page are
593
        downloaded to the workspace, then schedule
594
        :py:meth:`.process_process_file` to be run on
595
        them via `executor` (enforcing a per-page time
596
        limit of `max_seconds`).
597
598
        Delegates to :py:meth:`.process_page_file`
599
        (wrapped in :py:func:`_page_worker` to share
600
        the processor instance across forked processes).
601
602
        \b
603
        Returns a tuple of:
604
        - the scheduled future object,
605
        - the corresponding pageId,
606
        - the corresponding input files.
607
        """
608
        input_files : List[Optional[OcrdFileType]] = [None] * len(input_file_tuple)
609
        page_id = next(input_file.pageId
610
                       for input_file in input_file_tuple
611
                       if input_file)
612
        self._base_logger.info(f"preparing page {page_id}")
613
        for i, input_file in enumerate(input_file_tuple):
614
            if input_file is None:
615
                # file/page not found in this file grp
616
                continue
617
            input_files[i] = input_file
618
            if not self.download:
619
                continue
620
            try:
621
                input_files[i] = self.workspace.download_file(input_file)
622
            except (ValueError, FileNotFoundError, HTTPError) as e:
623
                self._base_logger.error(repr(e))
624
                self._base_logger.warning(f"failed downloading file {input_file} for page {page_id}")
625
        # process page
626
        #executor.submit(self.process_page_file, *input_files)
627
        return executor.submit(_page_worker, max_seconds, *input_files), page_id, input_files
628
629
    def process_workspace_handle_tasks(self, tasks : Dict[TFuture, Tuple[str, List[Optional[OcrdFileType]]]]) -> Tuple[int, int, Dict[str, int], int]:
630
        """
631
        Look up scheduled per-page futures one by one,
632
        handle errors (exceptions) and gather results.
633
634
        \b
635
        Enforces policies configured by the following
636
        environment variables:
637
        - `OCRD_EXISTING_OUTPUT` (abort/skip/overwrite)
638
        - `OCRD_MISSING_OUTPUT` (abort/skip/fallback-copy)
639
        - `OCRD_MAX_MISSING_OUTPUTS` (abort after all).
640
641
        \b
642
        Returns a tuple of:
643
        - the number of successfully processed pages
644
        - the number of failed (i.e. skipped or copied) pages
645
        - a dict of the type and corresponding number of exceptions seen
646
        - the number of total requested pages (i.e. success+fail+existing).
647
648
        Delegates to :py:meth:`.process_workspace_handle_page_task`
649
        for each page.
650
        """
651
        # aggregate info for logging:
652
        nr_succeeded = 0
653
        nr_failed = 0
654
        nr_errors = defaultdict(int) # count causes
655
        if config.OCRD_MISSING_OUTPUT == 'SKIP':
656
            reason = "skipped"
657
        elif config.OCRD_MISSING_OUTPUT == 'COPY':
658
            reason = "fallback-copied"
659
        for task in tasks:
660
            # wait for results, handle errors
661
            page_id, input_files = tasks[task]
662
            result = self.process_workspace_handle_page_task(page_id, input_files, task)
663
            if isinstance(result, Exception):
664
                nr_errors[result.__class__.__name__] += 1
665
                nr_failed += 1
666
                # FIXME: this is just prospective, because len(tasks)==nr_failed+nr_succeeded is not guaranteed
667
                if config.OCRD_MAX_MISSING_OUTPUTS > 0 and nr_failed / len(tasks) > config.OCRD_MAX_MISSING_OUTPUTS:
668
                    # already irredeemably many failures, stop short
669
                    nr_errors = dict(nr_errors)
670
                    raise Exception(f"too many failures with {reason} output ({nr_failed} of {nr_failed+nr_succeeded}, {str(nr_errors)})")
671
            elif result:
672
                nr_succeeded += 1
673
            # else skipped - already exists
674
        nr_errors = dict(nr_errors)
675
        nr_all = nr_succeeded + nr_failed
676
        if nr_failed > 0:
677
            if config.OCRD_MAX_MISSING_OUTPUTS > 0 and nr_failed / nr_all > config.OCRD_MAX_MISSING_OUTPUTS:
678
                raise Exception(f"too many failures with {reason} output ({nr_failed} of {nr_all}, {str(nr_errors)})")
679
            self._base_logger.warning("%s %d of %d pages due to %s", reason, nr_failed, nr_all, str(nr_errors))
680
        self._base_logger.debug("succeeded %d, missed %d of %d pages due to %s", nr_succeeded, nr_failed, nr_all, str(nr_errors))
681
        return nr_succeeded, nr_failed, nr_errors, len(tasks)
682
683
    def process_workspace_handle_page_task(self, page_id : str, input_files : List[Optional[OcrdFileType]], task : TFuture) -> Union[bool, Exception]:
684
        """
685
        \b
686
        Await a single page result and handle errors (exceptions), 
687
        enforcing policies configured by the following
688
        environment variables:
689
        - `OCRD_EXISTING_OUTPUT` (abort/skip/overwrite)
690
        - `OCRD_MISSING_OUTPUT` (abort/skip/fallback-copy)
691
        - `OCRD_MAX_MISSING_OUTPUTS` (abort after all).
692
693
        \b
694
        Returns
695
        - true in case of success
696
        - false in case the output already exists
697
        - the exception in case of failure
698
        """
699
        # FIXME: differentiate error cases in various ways:
700
        # - ResourceNotFoundError → use ResourceManager to download (once), then retry
701
        # - transient (I/O or OOM) error → maybe sleep, retry
702
        # - persistent (data) error → skip / dummy / raise
703
        try:
704
            self._base_logger.debug("waiting for output of task %s (page %s)", task, page_id)
705
            # timeout kwarg on future is useless: it only raises TimeoutError here,
706
            # but does not stop the running process/thread, and executor itself
707
            # offers nothing to that effect:
708
            # task.result(timeout=max_seconds or None)
709
            # so we instead applied the timeout within the worker function
710
            task.result()
711
            return True
712
        except NotImplementedError:
713
            # exclude NotImplementedError, so we can try process() below
714
            raise
715
        # handle input failures separately
716
        except FileExistsError as err:
717
            if config.OCRD_EXISTING_OUTPUT == 'ABORT':
718
                raise err
719
            if config.OCRD_EXISTING_OUTPUT == 'SKIP':
720
                return False
721
            if config.OCRD_EXISTING_OUTPUT == 'OVERWRITE':
722
                # too late here, must not happen
723
                raise Exception(f"got {err} despite OCRD_EXISTING_OUTPUT==OVERWRITE")
724
        except KeyboardInterrupt:
725
            raise
726
        # broad coverage of output failures (including TimeoutError)
727
        except Exception as err:
728
            # FIXME: add re-usable/actionable logging
729
            if config.OCRD_MISSING_OUTPUT == 'ABORT':
730
                self._base_logger.error(f"Failure on page {page_id}: {str(err) or err.__class__.__name__}")
731
                raise err
732
            self._base_logger.exception(f"Failure on page {page_id}: {str(err) or err.__class__.__name__}")
733
            if config.OCRD_MISSING_OUTPUT == 'SKIP':
734
                pass
735
            elif config.OCRD_MISSING_OUTPUT == 'COPY':
736
                self._copy_page_file(input_files[0])
737
            else:
738
                desc = config.describe('OCRD_MISSING_OUTPUT', wrap_text=False, indent_text=False)
739
                raise ValueError(f"unknown configuration value {config.OCRD_MISSING_OUTPUT} - {desc}")
740
            return err
741
742
    def _copy_page_file(self, input_file : OcrdFileType) -> None:
743
        """
744
        Copy the given ``input_file`` of the :py:data:`workspace`,
745
        representing one physical page (passed as one opened
746
        :py:class:`~ocrd_models.OcrdFile` per input fileGrp)
747
        and add it as if it was a processing result.
748
        """
749
        input_pcgts : OcrdPage
750
        assert isinstance(input_file, get_args(OcrdFileType))
751
        self._base_logger.debug(f"parsing file {input_file.ID} for page {input_file.pageId}")
752
        try:
753
            input_pcgts = page_from_file(input_file)
754
        except ValueError as err:
755
            # not PAGE and not an image to generate PAGE for
756
            self._base_logger.error(f"non-PAGE input for page {input_file.pageId}: {err}")
757
            return
758
        output_file_id = make_file_id(input_file, self.output_file_grp)
759
        input_pcgts.set_pcGtsId(output_file_id)
760
        self.add_metadata(input_pcgts)
761
        self.workspace.add_file(
762
            file_id=output_file_id,
763
            file_grp=self.output_file_grp,
764
            page_id=input_file.pageId,
765
            local_filename=os.path.join(self.output_file_grp, output_file_id + '.xml'),
766
            mimetype=MIMETYPE_PAGE,
767
            content=to_xml(input_pcgts),
768
        )
769
770
    def process_page_file(self, *input_files : Optional[OcrdFileType]) -> None:
771
        """
772
        Process the given ``input_files`` of the :py:data:`workspace`,
773
        representing one physical page (passed as one opened
774
        :py:class:`.OcrdFile` per input fileGrp)
775
        under the given :py:data:`.parameter`, and make sure the
776
        results get added accordingly.
777
778
        (This uses :py:meth:`.process_page_pcgts`, but should be overridden by subclasses
779
        to handle cases like multiple output fileGrps, non-PAGE input etc.)
780
        """
781
        input_pcgts : List[Optional[OcrdPage]] = [None] * len(input_files)
782
        input_pos = next(i for i, input_file in enumerate(input_files) if input_file is not None)
783
        page_id = input_files[input_pos].pageId
784
        self._base_logger.info("processing page %s", page_id)
785
        for i, input_file in enumerate(input_files):
786
            if input_file is None:
787
                grp = self.input_file_grp.split(',')[i]
788
                self._base_logger.debug(f"ignoring missing file for input fileGrp {grp} for page {page_id}")
789
                continue
790
            assert isinstance(input_file, get_args(OcrdFileType))
791
            self._base_logger.debug(f"parsing file {input_file.ID} for page {page_id}")
792
            try:
793
                page_ = page_from_file(input_file)
794
                assert isinstance(page_, OcrdPage)
795
                input_pcgts[i] = page_
796
            except ValueError as err:
797
                # not PAGE and not an image to generate PAGE for
798
                self._base_logger.error(f"non-PAGE input for page {page_id}: {err}")
799
        output_file_id = make_file_id(input_files[input_pos], self.output_file_grp)
800
        if input_files[input_pos].fileGrp == self.output_file_grp:
801
            # input=output fileGrp: re-use ID exactly
802
            output_file_id = input_files[input_pos].ID
803
        output_file = next(self.workspace.mets.find_files(ID=output_file_id), None)
804
        if output_file and config.OCRD_EXISTING_OUTPUT != 'OVERWRITE':
805
            # short-cut avoiding useless computation:
806
            raise FileExistsError(
807
                f"A file with ID=={output_file_id} already exists {output_file} and neither force nor ignore are set"
808
            )
809
        result = self.process_page_pcgts(*input_pcgts, page_id=page_id)
810
        for image_result in result.images:
811
            image_file_id = f'{output_file_id}_{image_result.file_id_suffix}'
812
            image_file_path = join(self.output_file_grp, f'{image_file_id}.png')
813
            if isinstance(image_result.alternative_image, PageType):
814
                # special case: not an alternative image, but replacing the original image
815
                # (this is needed by certain processors when the original's coordinate system
816
                #  cannot or must not be kept)
817
                image_result.alternative_image.set_imageFilename(image_file_path)
818
                image_result.alternative_image.set_imageWidth(image_result.pil.width)
819
                image_result.alternative_image.set_imageHeight(image_result.pil.height)
820
            elif isinstance(image_result.alternative_image, AlternativeImageType):
821
                image_result.alternative_image.set_filename(image_file_path)
822
            elif image_result.alternative_image is None:
823
                pass # do not reference in PAGE result
824
            else:
825
                raise ValueError(f"process_page_pcgts returned an OcrdPageResultImage of unknown type "
826
                                 f"{type(image_result.alternative_image)}")
827
            self.workspace.save_image_file(
828
                image_result.pil,
829
                image_file_id,
830
                self.output_file_grp,
831
                page_id=page_id,
832
                file_path=image_file_path,
833
            )
834
        result.pcgts.set_pcGtsId(output_file_id)
835
        self.add_metadata(result.pcgts)
836
        self.workspace.add_file(
837
            file_id=output_file_id,
838
            file_grp=self.output_file_grp,
839
            page_id=page_id,
840
            local_filename=os.path.join(self.output_file_grp, output_file_id + '.xml'),
841
            mimetype=MIMETYPE_PAGE,
842
            content=to_xml(result.pcgts),
843
        )
844
845
    def process_page_pcgts(self, *input_pcgts : Optional[OcrdPage], page_id : Optional[str] = None) -> OcrdPageResult:
846
        """
847
        Process the given ``input_pcgts`` of the :py:data:`.workspace`,
848
        representing one physical page (passed as one parsed
849
        :py:class:`.OcrdPage` per input fileGrp)
850
        under the given :py:data:`.parameter`, and return the
851
        resulting :py:class:`.OcrdPageResult`.
852
853
        Optionally, add to the ``images`` attribute of the resulting
854
        :py:class:`.OcrdPageResult` instances of :py:class:`.OcrdPageResultImage`,
855
        which have required fields for ``pil`` (:py:class:`PIL.Image` image data),
856
        ``file_id_suffix`` (used for generating IDs of the saved image) and
857
        ``alternative_image`` (reference of the :py:class:`ocrd_models.ocrd_page.AlternativeImageType`
858
        for setting the filename of the saved image).
859
860
        (This contains the main functionality and must be overridden by subclasses,
861
        unless it does not get called by some overriden :py:meth:`.process_page_file`.)
862
        """
863
        raise NotImplementedError()
864
865
    def add_metadata(self, pcgts: OcrdPage) -> None:
866
        """
867
        Add PAGE-XML :py:class:`~ocrd_models.ocrd_page.MetadataItemType` ``MetadataItem`` describing
868
        the processing step and runtime parameters to :py:class:`.OcrdPage` ``pcgts``.
869
        """
870
        metadata_obj = pcgts.get_Metadata()
871
        assert metadata_obj is not None
872
        metadata_obj.add_MetadataItem(
873
                MetadataItemType(type_="processingStep",
874
                    name=self.ocrd_tool['steps'][0],
875
                    value=self.ocrd_tool['executable'],
876
                    Labels=[LabelsType(
877
                        externalModel="ocrd-tool",
878
                        externalId="parameters",
879
                        Label=[LabelType(type_=name,
880
                                         value=self.parameter[name])
881
                               for name in self.parameter.keys()]),
882
                            LabelsType(
883
                        externalModel="ocrd-tool",
884
                        externalId="version",
885
                        Label=[LabelType(type_=self.ocrd_tool['executable'],
886
                                         value=self.version),
887
                               LabelType(type_='ocrd/core',
888
                                         value=OCRD_VERSION)])
889
                    ]))
890
891
    def resolve_resource(self, val):
892
        """
893
        Resolve a resource name to an absolute file path with the algorithm in
894
        `spec <https://ocr-d.de/en/spec/ocrd_tool#file-parameters>`_
895
896
        Args:
897
            val (string): resource value to resolve
898
        """
899
        executable = self.ocrd_tool['executable']
900
        if exists(val):
901
            self._base_logger.debug("Resolved to absolute path %s" % val)
902
            return val
903
        # FIXME: remove once workspace arg / old_pwd is gone:
904
        if hasattr(self, 'old_pwd'):
905
            cwd = self.old_pwd
906
        else:
907
            cwd = getcwd()
908
        ret = [cand for cand in list_resource_candidates(executable, val,
909
                                                         cwd=cwd, moduled=self.moduledir)
910
               if exists(cand)]
911
        if ret:
912
            self._base_logger.debug("Resolved %s to absolute path %s" % (val, ret[0]))
913
            return ret[0]
914
        raise ResourceNotFoundError(val, executable)
915
916
    def show_resource(self, val):
917
        """
918
        Resolve a resource name to a file path with the algorithm in
919
        `spec <https://ocr-d.de/en/spec/ocrd_tool#file-parameters>`_,
920
        then print its contents to stdout.
921
922
        Args:
923
            val (string): resource value to show
924
        """
925
        res_fname = self.resolve_resource(val)
926
        fpath = Path(res_fname)
927
        if fpath.is_dir():
928
            with pushd_popd(fpath):
929
                fileobj = io.BytesIO()
930
                with tarfile.open(fileobj=fileobj, mode='w:gz') as tarball:
931
                    tarball.add('.')
932
                fileobj.seek(0)
933
                copyfileobj(fileobj, sys.stdout.buffer)
934
        else:
935
            sys.stdout.buffer.write(fpath.read_bytes())
936
937
    def list_all_resources(self):
938
        """
939
        List all resources found in the filesystem and matching content-type by filename suffix
940
        """
941
        mimetypes = get_processor_resource_types(None, self.ocrd_tool)
942
        for res in list_all_resources(self.ocrd_tool['executable'], moduled=self.moduledir):
943
            res = Path(res)
944
            if not '*/*' in mimetypes:
945
                if res.is_dir() and not 'text/directory' in mimetypes:
946
                    continue
947
                # if we do not know all MIME types, then keep the file, otherwise require suffix match
948
                if res.is_file() and not any(res.suffix == MIME_TO_EXT.get(mime, res.suffix)
949
                                             for mime in mimetypes):
950
                    continue
951
            yield res
952
953
    @property
954
    def module(self):
955
        """
956
        The top-level module this processor belongs to.
957
        """
958
        # find shortest prefix path that is not just a namespace package
959
        fqname = ''
960
        for name in self.__module__.split('.'):
961
            if fqname:
962
                fqname += '.'
963
            fqname += name
964
            if getattr(sys.modules[fqname], '__file__', None):
965
                return fqname
966
        # fall-back
967
        return self.__module__
968
969
    @property
970
    def moduledir(self):
971
        """
972
        The filesystem path of the module directory.
973
        """
974
        return resource_filename(self.module, '.')
975
976
    @property
977
    def input_files(self):
978
        """
979
        List the input files (for single-valued :py:attr:`input_file_grp`).
980
981
        For each physical page:
982
983
        - If there is a single PAGE-XML for the page, take it (and forget about all
984
          other files for that page)
985
        - Else if there is a single image file, take it (and forget about all other
986
          files for that page)
987
        - Otherwise raise an error (complaining that only PAGE-XML warrants
988
          having multiple images for a single page)
989
990
        See `algorithm <https://github.com/cisocrgroup/ocrd_cis/pull/57#issuecomment-656336593>`_
991
992
        Returns:
993
            A list of :py:class:`ocrd_models.ocrd_file.OcrdFile` objects.
994
        """
995
        if not self.input_file_grp:
996
            raise ValueError("Processor is missing input fileGrp")
997
        ret = self.zip_input_files(mimetype=None, on_error='abort')
998
        if not ret:
999
            return []
1000
        assert len(ret[0]) == 1, 'Use zip_input_files() instead of input_files when processing multiple input fileGrps'
1001
        return [tuples[0] for tuples in ret]
1002
1003
    def zip_input_files(self, require_first=True, mimetype=None, on_error='skip'):
1004
        """
1005
        List tuples of input files (for multi-valued :py:attr:`input_file_grp`).
1006
1007
        Processors that expect/need multiple input file groups,
1008
        cannot use :py:data:`input_files`. They must align (zip) input files
1009
        across pages. This includes the case where not all pages
1010
        are equally present in all file groups. It also requires
1011
        making a consistent selection if there are multiple files
1012
        per page.
1013
1014
        Following the OCR-D functional model, this function tries to
1015
        find a single PAGE file per page, or fall back to a single
1016
        image file per page. In either case, multiple matches per page
1017
        are an error (see error handling below).
1018
        This default behaviour can be changed by using a fixed MIME
1019
        type filter via :py:attr:`mimetype`. But still, multiple matching
1020
        files per page are an error.
1021
1022
        Single-page multiple-file errors are handled according to
1023
        :py:attr:`on_error`:
1024
1025
        - if ``skip``, then the page for the respective fileGrp will be
1026
          silently skipped (as if there was no match at all)
1027
        - if ``first``, then the first matching file for the page will be
1028
          silently selected (as if the first was the only match)
1029
        - if ``last``, then the last matching file for the page will be
1030
          silently selected (as if the last was the only match)
1031
        - if ``abort``, then an exception will be raised.
1032
1033
        Multiple matches for PAGE-XML will always raise an exception.
1034
1035
        Keyword Args:
1036
             require_first (boolean): If true, then skip a page entirely
1037
                 whenever it is not available in the first input `fileGrp`.
1038
             on_error (string): How to handle multiple file matches per page.
1039
             mimetype (string): If not `None`, filter by the specified MIME
1040
                 type (literal or regex prefixed by `//`). Otherwise prefer
1041
                 PAGE or image.
1042
        Returns:
1043
            A list of :py:class:`ocrd_models.ocrd_file.OcrdFile` tuples.
1044
        """
1045
        if not self.input_file_grp:
1046
            raise ValueError("Processor is missing input fileGrp")
1047
1048
        ifgs = self.input_file_grp.split(",")
1049
        # Iterating over all files repeatedly may seem inefficient at first sight,
1050
        # but the unnecessary OcrdFile instantiations for posterior fileGrp filtering
1051
        # can actually be much more costly than traversing the ltree.
1052
        # This might depend on the number of pages vs number of fileGrps.
1053
1054
        pages = {}
1055
        for i, ifg in enumerate(ifgs):
1056
            files_ = sorted(self.workspace.mets.find_all_files(
1057
                    pageId=self.page_id, fileGrp=ifg, mimetype=mimetype),
1058
                                # sort by MIME type so PAGE comes before images
1059
                                key=lambda file_: file_.mimetype)
1060
            for file_ in files_:
1061
                if not file_.pageId:
1062
                    # ignore document-global files
1063
                    continue
1064
                ift = pages.setdefault(file_.pageId, [None]*len(ifgs))
1065
                if ift[i]:
1066
                    self._base_logger.debug(f"another file {file_.ID} for page {file_.pageId} in input file group {ifg}")
1067
                    # fileGrp has multiple files for this page ID
1068
                    if mimetype:
1069
                        # filter was active, this must not happen
1070
                        self._base_logger.warning(f"added file {file_.ID} for page {file_.pageId} in input file group {ifg} "
1071
                                                  f"conflicts with file {ift[i].ID} of same MIME type {mimetype} - on_error={on_error}")
1072 View Code Duplication
                        if on_error == 'skip':
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1073
                            ift[i] = None
1074
                        elif on_error == 'first':
1075
                            pass # keep first match
1076
                        elif on_error == 'last':
1077
                            ift[i] = file_
1078
                        elif on_error == 'abort':
1079
                            raise NonUniqueInputFile(ifg, file_.pageId, mimetype)
1080
                        else:
1081
                            raise Exception("Unknown 'on_error' strategy '%s'" % on_error)
1082
                    elif (ift[i].mimetype == MIMETYPE_PAGE and
1083
                          file_.mimetype != MIMETYPE_PAGE):
1084
                        pass # keep PAGE match
1085
                    elif (ift[i].mimetype == MIMETYPE_PAGE and
1086
                          file_.mimetype == MIMETYPE_PAGE):
1087
                        raise NonUniqueInputFile(ifg, file_.pageId, None)
1088
                    else:
1089
                        # filter was inactive but no PAGE is in control, this must not happen
1090
                        self._base_logger.warning(f"added file {file_.ID} for page {file_.pageId} in input file group {ifg} "
1091
                                                  f"conflicts with file {ift[i].ID} but no PAGE available - on_error={on_error}")
1092 View Code Duplication
                        if on_error == 'skip':
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1093
                            ift[i] = None
1094
                        elif on_error == 'first':
1095
                            pass # keep first match
1096
                        elif on_error == 'last':
1097
                            ift[i] = file_
1098
                        elif on_error == 'abort':
1099
                            raise NonUniqueInputFile(ifg, file_.pageId, None)
1100
                        else:
1101
                            raise Exception("Unknown 'on_error' strategy '%s'" % on_error)
1102
                else:
1103
                    self._base_logger.debug(f"adding file {file_.ID} for page {file_.pageId} to input file group {ifg}")
1104
                    ift[i] = file_
1105
        # Warn if no files found but pageId was specified, because that might be due to invalid page_id (range)
1106
        if self.page_id and not any(pages):
1107
            self._base_logger.critical(f"Could not find any files for selected pageId {self.page_id}.\n"
1108
                                       f"compare '{self.page_id}' with the output of 'orcd workspace list-page'.")
1109
        ifts = []
1110
        for page, ifiles in pages.items():
1111
            for i, ifg in enumerate(ifgs):
1112
                if not ifiles[i]:
1113
                    # could be from non-unique with on_error=skip or from true gap
1114
                    self._base_logger.error(f'Found no file for page {page} in file group {ifg}')
1115
                    if config.OCRD_MISSING_INPUT == 'ABORT':
1116
                        raise MissingInputFile(ifg, page, mimetype)
1117
            if not any(ifiles):
1118
                # must be from non-unique with on_error=skip
1119
                self._base_logger.warning(f'Found no files for {page} - skipping')
1120
                continue
1121
            if ifiles[0] or not require_first:
1122
                ifts.append(tuple(ifiles))
1123
        return ifts
1124
1125
_page_worker_processor = None
1126
"""
1127
This global binding for the processor is required to avoid
1128
squeezing the processor through a mp.Queue (which is impossible
1129
due to unpicklable attributes like .workspace.mets._tree anyway)
1130
when calling Processor.process_page_file as page worker processes
1131
in Processor.process_workspace. Forking allows inheriting global
1132
objects, and with the METS Server we do not mutate the local
1133
processor instance anyway.
1134
"""
1135
def _page_worker_set_ctxt(processor, log_queue):
1136
    """
1137
    Overwrites `ocrd.processor.base._page_worker_processor` instance
1138
    for sharing with subprocesses in ProcessPoolExecutor initializer.
1139
    """
1140
    global _page_worker_processor
1141
    _page_worker_processor = processor
1142
    if log_queue:
1143
        # replace all log handlers with just one queue handler
1144
        logging.root.handlers = [logging.handlers.QueueHandler(log_queue)]
1145
1146
def _page_worker(timeout, *input_files):
1147
    """
1148
    Wraps a `Processor.process_page_file` call as payload (call target)
1149
    of the ProcessPoolExecutor workers, but also enforces the given timeout.
1150
    """
1151
    page_id = next((file.pageId for file in input_files
1152
                    if hasattr(file, 'pageId')), "")
1153
    if timeout > 0:
1154
        timer = Timer(timeout, interrupt_main)
1155
        timer.start()
1156
    try:
1157
        _page_worker_processor.process_page_file(*input_files)
1158
        _page_worker_processor.logger.debug("page worker completed for page %s", page_id)
1159
    except KeyboardInterrupt:
1160
        _page_worker_processor.logger.debug("page worker timed out for page %s", page_id)
1161
        raise TimeoutError()
1162
    finally:
1163
        if timeout > 0:
1164
            timer.cancel()
1165
1166
def generate_processor_help(ocrd_tool, processor_instance=None, subcommand=None):
1167
    """Generate a string describing the full CLI of this processor including params.
1168
1169
    Args:
1170
         ocrd_tool (dict): this processor's ``tools`` section of the module's ``ocrd-tool.json``
1171
         processor_instance (object, optional): the processor implementation
1172
             (for adding any module/class/function docstrings)
1173
        subcommand (string): 'worker' or 'server'
1174
    """
1175
    doc_help = ''
1176
    if processor_instance:
1177
        module = inspect.getmodule(processor_instance)
1178
        if module and module.__doc__:
1179
            doc_help += '\n' + inspect.cleandoc(module.__doc__) + '\n'
1180
        if processor_instance.__doc__:
1181
            doc_help += '\n' + inspect.cleandoc(processor_instance.__doc__) + '\n'
1182
        # Try to find the most concrete docstring among the various methods that an implementation
1183
        # could overload, first serving.
1184
        # In doing so, compare with Processor to avoid a glitch in the way py>=3.5 inherits docstrings.
1185
        # (They are supposed to only repeat information inspect.getdoc, rather than inherit __doc__ itself.)
1186
        for method in ['process_page_pcgts', 'process_page_file', 'process_workspace', 'process']:
1187
            instance_method = getattr(processor_instance, method)
1188
            superclass_method = getattr(Processor, method)
1189
            if instance_method.__doc__ and instance_method.__doc__ != superclass_method.__doc__:
1190
                doc_help += '\n' + inspect.cleandoc(instance_method.__doc__) + '\n'
1191
                break
1192
        if doc_help:
1193
            doc_help = '\n\n' + wrap_text(doc_help, width=72,
1194
                                          initial_indent='  > ',
1195
                                          subsequent_indent='  > ',
1196
                                          preserve_paragraphs=True)
1197
    subcommands = '''\
1198
    worker      Start a processing worker rather than do local processing
1199
    server      Start a processor server rather than do local processing
1200
'''
1201
1202
    processing_worker_options = '''\
1203
  --queue                         The RabbitMQ server address in format
1204
                                  "amqp://{user}:{pass}@{host}:{port}/{vhost}"
1205
                                  [amqp://admin:admin@localhost:5672]
1206
  --database                      The MongoDB server address in format
1207
                                  "mongodb://{host}:{port}"
1208
                                  [mongodb://localhost:27018]
1209
  --log-filename                  Filename to redirect STDOUT/STDERR to,
1210
                                  if specified.
1211
'''
1212
1213
    processing_server_options = '''\
1214
  --address                       The Processor server address in format
1215
                                  "{host}:{port}"
1216
  --database                      The MongoDB server address in format
1217
                                  "mongodb://{host}:{port}"
1218
                                  [mongodb://localhost:27018]
1219
'''
1220
1221
    processing_options = '''\
1222
  -m, --mets URL-PATH             URL or file path of METS to process [./mets.xml]
1223
  -w, --working-dir PATH          Working directory of local workspace [dirname(URL-PATH)]
1224
  -I, --input-file-grp USE        File group(s) used as input
1225
  -O, --output-file-grp USE       File group(s) used as output
1226
  -g, --page-id ID                Physical page ID(s) to process instead of full document []
1227
  --overwrite                     Remove existing output pages/images
1228
                                  (with "--page-id", remove only those).
1229
                                  Short-hand for OCRD_EXISTING_OUTPUT=OVERWRITE
1230
  --debug                         Abort on any errors with full stack trace.
1231
                                  Short-hand for OCRD_MISSING_OUTPUT=ABORT
1232
  --profile                       Enable profiling
1233
  --profile-file PROF-PATH        Write cProfile stats to PROF-PATH. Implies "--profile"
1234
  -p, --parameter JSON-PATH       Parameters, either verbatim JSON string
1235
                                  or JSON file path
1236
  -P, --param-override KEY VAL    Override a single JSON object key-value pair,
1237
                                  taking precedence over --parameter
1238
  -U, --mets-server-url URL       URL of a METS Server for parallel incremental access to METS
1239
                                  If URL starts with http:// start an HTTP server there,
1240
                                  otherwise URL is a path to an on-demand-created unix socket
1241
  -l, --log-level [OFF|ERROR|WARN|INFO|DEBUG|TRACE]
1242
                                  Override log level globally [INFO]
1243
  --log-filename LOG-PATH         File to redirect stderr logging to (overriding ocrd_logging.conf).
1244
'''
1245
1246
    information_options = '''\
1247
  -C, --show-resource RESNAME     Dump the content of processor resource RESNAME
1248
  -L, --list-resources            List names of processor resources
1249
  -J, --dump-json                 Dump tool description as JSON
1250
  -D, --dump-module-dir           Show the 'module' resource location path for this processor
1251
  -h, --help                      Show this message
1252
  -V, --version                   Show version
1253
'''
1254
1255
    parameter_help = ''
1256
    if 'parameters' not in ocrd_tool or not ocrd_tool['parameters']:
1257
        parameter_help = '  NONE\n'
1258
    else:
1259
        def wrap(s):
1260
            return wrap_text(s, initial_indent=' '*3,
1261
                             subsequent_indent=' '*4,
1262
                             width=72, preserve_paragraphs=True)
1263
        for param_name, param in ocrd_tool['parameters'].items():
1264
            parameter_help += wrap('"%s" [%s%s]' % (
1265
                param_name,
1266
                param['type'],
1267
                ' - REQUIRED' if 'required' in param and param['required'] else
1268
                ' - %s' % json.dumps(param['default']) if 'default' in param else ''))
1269
            parameter_help += '\n ' + wrap(param['description'])
1270
            if 'enum' in param:
1271
                parameter_help += '\n ' + wrap('Possible values: %s' % json.dumps(param['enum']))
1272
            parameter_help += "\n"
1273
1274
    if not subcommand:
1275
        return f'''\
1276
Usage: {ocrd_tool['executable']} [worker|server] [OPTIONS]
1277
1278
  {ocrd_tool['description']}{doc_help}
1279
1280
Subcommands:
1281
{subcommands}
1282
Options for processing:
1283
{processing_options}
1284
Options for information:
1285
{information_options}
1286
Parameters:
1287
{parameter_help}
1288
'''
1289
    elif subcommand == 'worker':
1290
        return f'''\
1291
Usage: {ocrd_tool['executable']} worker [OPTIONS]
1292
1293
  Run {ocrd_tool['executable']} as a processing worker.
1294
1295
  {ocrd_tool['description']}{doc_help}
1296
1297
Options:
1298
{processing_worker_options}
1299
'''
1300
    elif subcommand == 'server':
1301
        return f'''\
1302
Usage: {ocrd_tool['executable']} server [OPTIONS]
1303
1304
  Run {ocrd_tool['executable']} as a processor sever.
1305
1306
  {ocrd_tool['description']}{doc_help}
1307
1308
Options:
1309
{processing_server_options}
1310
'''
1311
    else:
1312
        pass
1313