Passed
Pull Request — master (#1345)
by
unknown
02:14
created

ocrd.processor.base.Processor.verify()   C

Complexity

Conditions 9

Size

Total Lines 41
Code Lines 32

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 32
dl 0
loc 41
rs 6.6666
c 0
b 0
f 0
cc 9
nop 1
1
"""
2
Processor base class and helper functions.
3
"""
4
5
__all__ = [
6
    'Processor',
7
    'generate_processor_help',
8
    'run_cli',
9
    'run_processor'
10
]
11
12
from functools import cached_property
13
from os.path import exists, join
14
from shutil import copyfileobj
15
import json
16
import os
17
from os import getcwd
18
from pathlib import Path
19
from typing import Dict, List, Optional, Tuple, Union, get_args
20
import sys
21
import logging
22
import logging.handlers
23
import inspect
24
import tarfile
25
import io
26
from collections import defaultdict
27
from frozendict import frozendict
28
# concurrent.futures is buggy in py38,
29
# this is where the fixes came from:
30
#from loky import Future, ProcessPoolExecutor
31
from pebble import ProcessFuture as Future, ProcessPool as ProcessPoolExecutor
32
from concurrent.futures import TimeoutError
33
import multiprocessing as mp
34
from click import wrap_text
35
from deprecated import deprecated
36
from requests import HTTPError
37
38
from ..workspace import Workspace
39
from ..mets_server import ClientSideOcrdMets
40
from ocrd_models.ocrd_file import OcrdFileType
41
from .ocrd_page_result import OcrdPageResult
42
from ocrd_utils import (
43
    VERSION as OCRD_VERSION,
44
    MIMETYPE_PAGE,
45
    config,
46
    getLogger,
47
    list_resource_candidates,
48
    list_all_resources,
49
    get_processor_resource_types,
50
    resource_filename,
51
    parse_json_file_with_comments,
52
    pushd_popd,
53
    make_file_id,
54
    deprecation_warning
55
)
56
from ocrd_validators import ParameterValidator
57
from ocrd_models.ocrd_page import (
58
    PageType,
59
    AlternativeImageType,
60
    MetadataItemType,
61
    LabelType,
62
    LabelsType,
63
    OcrdPage,
64
    to_xml,
65
)
66
from ocrd_modelfactory import page_from_file
67
from ocrd_validators.ocrd_tool_validator import OcrdToolValidator
68
69
# XXX imports must remain for backwards-compatibility
70
from .helpers import run_cli, run_processor  # pylint: disable=unused-import
71
72
73
class ResourceNotFoundError(FileNotFoundError):
74
    """
75
    An exception signifying the requested processor resource
76
    cannot be resolved.
77
    """
78
    def __init__(self, name, executable):
79
        self.name = name
80
        self.executable = executable
81
        self.message = (f"Could not find resource '{name}' for executable '{executable}'. "
82
                        f"Try 'ocrd resmgr download {executable} {name}' to download this resource.")
83
        super().__init__(self.message)
84
85
86
class NonUniqueInputFile(ValueError):
87
    """
88
    An exception signifying the specified fileGrp / pageId / mimetype
89
    selector yields multiple PAGE files, or no PAGE files but multiple images,
90
    or multiple files of that mimetype.
91
    """
92
    def __init__(self, fileGrp, pageId, mimetype):
93
        self.fileGrp = fileGrp
94
        self.pageId = pageId
95
        self.mimetype = mimetype
96
        self.message = (f"Could not determine unique input file for fileGrp {fileGrp} "
97
                        f"and pageId {pageId} under mimetype {mimetype or 'PAGE+image(s)'}")
98
        super().__init__(self.message)
99
100
101
class MissingInputFile(ValueError):
102
    """
103
    An exception signifying the specified fileGrp / pageId / mimetype
104
    selector yields no PAGE files, or no PAGE and no image files,
105
    or no files of that mimetype.
106
    """
107
    def __init__(self, fileGrp, pageId, mimetype):
108
        self.fileGrp = fileGrp
109
        self.pageId = pageId
110
        self.mimetype = mimetype
111
        self.message = (f"Could not find input file for fileGrp {fileGrp} "
112
                        f"and pageId {pageId} under mimetype {mimetype or 'PAGE+image(s)'}")
113
        super().__init__(self.message)
114
115
116
class DummyFuture:
117
    """
118
    Mimics some of `concurrent.futures.Future` but runs immediately.
119
    """
120
    def __init__(self, fn, *args, **kwargs):
121
        self.fn = fn
122
        self.args = args
123
        self.kwargs = kwargs
124
125
    def result(self):
126
        return self.fn(*self.args, **self.kwargs)
127
128
129
class DummyExecutor:
130
    """
131
    Mimics some of `concurrent.futures.ProcessPoolExecutor` but runs
132
    everything immediately in this process.
133
    """
134
    def __init__(self, initializer=None, initargs=(), **kwargs):
135
        initializer(*initargs)
136
137
    def stop(self):
138
        # allow gc to catch processor instance (unless cached)
139
        _page_worker_set_ctxt(None, None)
140
141
    def join(self, **kwargs):
142
        pass
143
144
    def submit(self, fn, timeout, *args, **kwargs) -> DummyFuture:
145
        # FIXME: there is nothing we can do in uniprocessing to enforce timeouts
146
        return DummyFuture(fn, *args, **kwargs)
147
148
149
TFuture = Union[DummyFuture, Future]
150
TExecutor = Union[DummyExecutor, ProcessPoolExecutor]
151
152
153
class Processor():
154
    """
155
    A processor is a tool that implements the uniform OCR-D
156
    `command-line interface for run-time data processing <https://ocr-d.de/en/spec/cli>`_.
157
158
    That is, it executes a single workflow step, or a combination of workflow steps,
159
    on the workspace (represented by local METS). It reads input files for all or selected
160
    physical pages of the input fileGrp(s), computes additional annotation, and writes output
161
    files for them into the output fileGrp(s). It may take a number of optional or mandatory
162
    parameters.
163
    """
164
165
    max_instances: int = -1
166
    """
167
    maximum number of cached instances (ignored if negative), to be applied on top of
168
    :py:data:`~ocrd_utils.config.OCRD_MAX_PROCESSOR_CACHE` (i.e. whatever is smaller).
169
170
    (Override this if you know how many instances fit into memory - GPU / CPU RAM - at once.)
171
    """
172
173
    max_workers: int = -1
174
    """
175
    maximum number of processor forks for page-parallel processing (ignored if negative),
176
    to be applied on top of :py:data:`~ocrd_utils.config.OCRD_MAX_PARALLEL_PAGES` (i.e.
177
    whatever is smaller).
178
179
    (Override this if you know how many pages fit into processing units - GPU shaders / CPU cores
180
    - at once, or if your class already creates threads prior to forking, e.g. during ``setup``.)
181
    """
182
183
    max_page_seconds: int = -1
184
    """
185
    maximum number of seconds may be spent processing a single page (ignored if negative),
186
    to be applied on top of :py:data:`~ocrd_utils.config.OCRD_PROCESSING_PAGE_TIMEOUT`
187
    (i.e. whatever is smaller).
188
189
    (Override this if you know how costly this processor may be, irrespective of image size
190
    or complexity of the page.)
191
    """
192
193
    @property
194
    def metadata_filename(self) -> str:
195
        """
196
        Relative location of the ``ocrd-tool.json`` file inside the package.
197
198
        Used by :py:data:`metadata_location`.
199
200
        (Override if ``ocrd-tool.json`` is not in the root of the module,
201
        e.g. ``namespace/ocrd-tool.json`` or ``data/ocrd-tool.json``).
202
        """
203
        return 'ocrd-tool.json'
204
205
    @cached_property
206
    def metadata_location(self) -> Path:
207
        """
208
        Absolute path of the ``ocrd-tool.json`` file as distributed with the package.
209
210
        Used by :py:data:`metadata_rawdict`.
211
212
        (Override if ``ocrd-tool.json`` is not distributed with the Python package.)
213
        """
214
        module = inspect.getmodule(self)
215
        module_tokens = module.__package__.split('.')
216
        # for namespace packages, we cannot just use the first token
217
        for i in range(len(module_tokens)):
218
            prefix = '.'.join(module_tokens[:i + 1])
219
            if sys.modules[prefix].__spec__.has_location:
220
                return resource_filename(prefix, self.metadata_filename)
221
        raise Exception("cannot find top-level module prefix for %s", module.__package__)
222
223
    @cached_property
224
    def metadata_rawdict(self) -> dict:
225
        """
226
        Raw (unvalidated, unexpanded) ``ocrd-tool.json`` dict contents of the package.
227
228
        Used by :py:data:`metadata`.
229
230
        (Override if ``ocrd-tool.json`` is not in a file.)
231
        """
232
        return parse_json_file_with_comments(self.metadata_location)
233
234
    @cached_property
235
    def metadata(self) -> dict:
236
        """
237
        The ``ocrd-tool.json`` dict contents of the package, according to the OCR-D
238
        `spec <https://ocr-d.de/en/spec/ocrd_tool>`_ for processor tools.
239
240
        After deserialisation, it also gets validated against the
241
        `schema <https://ocr-d.de/en/spec/ocrd_tool#definition>`_ with all defaults
242
        expanded.
243
244
        Used by :py:data:`ocrd_tool` and :py:data:`version`.
245
246
        (Override if you want to provide metadata programmatically instead of a
247
        JSON file.)
248
        """
249
        metadata = self.metadata_rawdict
250
        report = OcrdToolValidator.validate(metadata)
251
        if not report.is_valid:
252
            self.logger.error(f"The ocrd-tool.json of this processor is {'problematic' if not report.errors else 'invalid'}:\n"
253
                              f"{report.to_xml()}.\nPlease open an issue at {metadata.get('git_url', 'the website')}.")
254
        return metadata
255
256
    @cached_property
257
    def version(self) -> str:
258
        """
259
        The program version of the package.
260
        Usually the ``version`` part of :py:data:`metadata`.
261
262
        (Override if you do not want to use :py:data:`metadata` lookup
263
        mechanism.)
264
        """
265
        return self.metadata['version']
266
267
    @cached_property
268
    def executable(self) -> str:
269
        """
270
        The executable name of this processor tool. Taken from the runtime
271
        filename.
272
273
        Used by :py:data:`ocrd_tool` for lookup in :py:data:`metadata`.
274
275
        (Override if your entry-point name deviates from the ``executable``
276
        name, or the processor gets instantiated from another runtime.)
277
        """
278
        return os.path.basename(inspect.stack()[-1].filename)
279
280
    @cached_property
281
    def ocrd_tool(self) -> dict:
282
        """
283
        The ``ocrd-tool.json`` dict contents of this processor tool.
284
        Usually the :py:data:`executable` key of the ``tools`` part
285
        of :py:data:`metadata`.
286
287
        (Override if you do not want to use :py:data:`metadata` lookup
288
        mechanism.)
289
        """
290
        return self.metadata['tools'][self.executable]
291
292
    @property
293
    def parameter(self) -> Optional[dict]:
294
        """the runtime parameter dict to be used by this processor"""
295
        if hasattr(self, '_parameter'):
296
            return self._parameter
297
        return None
298
299
    @parameter.setter
300
    def parameter(self, parameter: dict) -> None:
301
        if self.parameter is not None:
302
            self.shutdown()
303
        parameterValidator = ParameterValidator(self.ocrd_tool)
304
        report = parameterValidator.validate(parameter)
305
        if not report.is_valid:
306
            raise ValueError(f'Invalid parameters:\n{report.to_xml()}')
307
        # make parameter dict read-only
308
        self._parameter = frozendict(parameter)
309
        # (re-)run setup to load models etc
310
        self.setup()
311
312
    def __init__(
313
            self,
314
            # FIXME: remove in favor of process_workspace(workspace)
315
            workspace: Optional[Workspace],
316
            ocrd_tool=None,
317
            parameter=None,
318
            input_file_grp=None,
319
            output_file_grp=None,
320
            page_id=None,
321
            version=None
322
    ):
323
        """
324
        Instantiate, but do not setup (neither for processing nor other usage).
325
        If given, do parse and validate :py:data:`.parameter`.
326
327
        Args:
328
             workspace (:py:class:`~ocrd.Workspace`): The workspace to process. \
329
                 If not ``None``, then `chdir` to that directory.
330
                 Deprecated since version 3.0: Should be ``None`` here, but then needs to be set \
331
                 before processing.
332
        Keyword Args:
333
             parameter (string): JSON of the runtime choices for ocrd-tool ``parameters``. \
334
                 Can be ``None`` even for processing, but then needs to be set before running.
335
             input_file_grp (string): comma-separated list of METS ``fileGrp`` used for input. \
336
                 Deprecated since version 3.0: Should be ``None`` here, but then needs to be set \
337
                 before processing.
338
             output_file_grp (string): comma-separated list of METS ``fileGrp`` used for output. \
339
                 Deprecated since version 3.0: Should be ``None`` here, but then needs to be set \
340
                 before processing.
341
             page_id (string): comma-separated list of METS physical ``page`` IDs to process \
342
                 (or empty for all pages). \
343
                 Deprecated since version 3.0: Should be ``None`` here, but then needs to be set \
344
                 before processing.
345
        """
346
        if ocrd_tool is not None:
347
            deprecation_warning("Passing 'ocrd_tool' as keyword argument to Processor is deprecated - "
348
                                "use or override metadata/executable/ocrd-tool properties instead")
349
            self.ocrd_tool = ocrd_tool
350
            self.executable = ocrd_tool['executable']
351
        if version is not None:
352
            deprecation_warning("Passing 'version' as keyword argument to Processor is deprecated - "
353
                                "use or override metadata/version properties instead")
354
            self.version = version
355
        if workspace is not None:
356
            deprecation_warning("Passing a workspace argument other than 'None' to Processor "
357
                                "is deprecated - pass as argument to process_workspace instead")
358
            self.workspace = workspace
359
            self.old_pwd = getcwd()
360
            os.chdir(self.workspace.directory)
361
        if input_file_grp is not None:
362
            deprecation_warning("Passing an input_file_grp kwarg other than 'None' to Processor "
363
                                "is deprecated - pass as argument to process_workspace instead")
364
            self.input_file_grp = input_file_grp
365
        if output_file_grp is not None:
366
            deprecation_warning("Passing an output_file_grp kwarg other than 'None' to Processor "
367
                                "is deprecated - pass as argument to process_workspace instead")
368
            self.output_file_grp = output_file_grp
369
        if page_id is not None:
370
            deprecation_warning("Passing a page_id kwarg other than 'None' to Processor "
371
                                "is deprecated - pass as argument to process_workspace instead")
372
            self.page_id = page_id or None
373
        self.download = config.OCRD_DOWNLOAD_INPUT
374
        #: The logger to be used by processor implementations.
375
        # `ocrd.processor.base` internals should use :py:attr:`self._base_logger`
376
        self.logger = getLogger(f'ocrd.processor.{self.__class__.__name__}')
377
        self._base_logger = getLogger('ocrd.processor.base')
378
        if parameter is not None:
379
            self.parameter = parameter
380
        # workaround for deprecated#72 (@deprecated decorator does not work for subclasses):
381
        setattr(self, 'process', deprecated(
382
            version='3.0', reason='process() should be replaced '
383
            'with process_page_pcgts() or process_page_file() or process_workspace()')(
384
                getattr(self, 'process')))
385
386
    def __del__(self):
387
        self._base_logger.debug("shutting down %s in %s", repr(self), mp.current_process().name)
388
        self.shutdown()
389
390
    def show_help(self, subcommand=None):
391
        """
392
        Print a usage description including the standard CLI and all of this processor's ocrd-tool
393
        parameters and docstrings.
394
        """
395
        print(generate_processor_help(self.ocrd_tool, processor_instance=self, subcommand=subcommand))
396
397
    def show_version(self):
398
        """
399
        Print information on this processor's version and OCR-D version.
400
        """
401
        print("Version %s, ocrd/core %s" % (self.version, OCRD_VERSION))
402
403
    def verify(self):
404
        """
405
        Verify that :py:attr:`input_file_grp` and :py:attr:`output_file_grp` fulfill the processor's requirements.
406
        """
407
        # verify input and output file groups in parameters
408
        assert self.input_file_grp is not None
409
        assert self.output_file_grp is not None
410
        input_file_grps = self.input_file_grp.split(',')
411
        output_file_grps = self.output_file_grp.split(',')
412
413
        def assert_file_grp_cardinality(grps: List[str], spec: Union[int, List[int]], msg):
414
            if isinstance(spec, int):
415
                if spec > 0:
416
                    assert len(grps) == spec, msg % (len(grps), str(spec))
417
            else:
418
                assert isinstance(spec, list)
419
                minimum = spec[0]
420
                maximum = spec[1]
421
                if minimum > 0:
422
                    assert len(grps) >= minimum, msg % (len(grps), str(spec))
423
                if maximum > 0:
424
                    assert len(grps) <= maximum, msg % (len(grps), str(spec))
425
        # FIXME: enforce unconditionally as soon as grace period for deprecation is over
426
        if 'input_file_grp_cardinality' in self.ocrd_tool:
427
            assert_file_grp_cardinality(input_file_grps, self.ocrd_tool['input_file_grp_cardinality'],
428
                                        "Unexpected number of input file groups %d vs %s")
429
        if 'output_file_grp_cardinality' in self.ocrd_tool:
430
            assert_file_grp_cardinality(output_file_grps, self.ocrd_tool['output_file_grp_cardinality'],
431
                                        "Unexpected number of output file groups %d vs %s")
432
        # verify input and output file groups in METS
433
        for input_file_grp in input_file_grps:
434
            assert input_file_grp in self.workspace.mets.file_groups, \
435
                f"input fileGrp {input_file_grp} does not exist in workspace {self.workspace}"
436
        for output_file_grp in output_file_grps:
437
            assert (output_file_grp not in self.workspace.mets.file_groups
438
                    or config.OCRD_EXISTING_OUTPUT in ['OVERWRITE', 'SKIP']
439
                    or not any(self.workspace.mets.find_files(
440
                        pageId=self.page_id, fileGrp=output_file_grp))), \
441
                    f"output fileGrp {output_file_grp} already exists in workspace {self.workspace}"
442
        # keep this for backwards compatibility:
443
        return True
444
445
    def dump_json(self):
446
        """
447
        Print :py:attr:`ocrd_tool` on stdout.
448
        """
449
        print(json.dumps(self.ocrd_tool, indent=True))
450
451
    def dump_module_dir(self):
452
        """
453
        Print :py:attr:`moduledir` on stdout.
454
        """
455
        print(self.moduledir)
456
457
    def list_resources(self):
458
        """
459
        Find all installed resource files in the search paths and print their path names.
460
        """
461
        for res in self.list_all_resources():
462
            print(res)
463
464
    def setup(self) -> None:
465
        """
466
        Prepare the processor for actual data processing,
467
        prior to changing to the workspace directory but
468
        after parsing parameters.
469
470
        (Override this to load models into memory etc.)
471
        """
472
        pass
473
474
    def shutdown(self) -> None:
475
        """
476
        Bring down the processor after data processing,
477
        after to changing back from the workspace directory but
478
        before exiting (or setting up with different parameters).
479
480
        (Override this to unload models from memory etc.)
481
        """
482
        pass
483
484
    @deprecated(version='3.0', reason='process() should be replaced '
485
                'with process_page_pcgts() or process_page_file() or process_workspace()')
486
    def process(self) -> None:
487
        """
488
        Process all files of the :py:data:`workspace`
489
        from the given :py:data:`input_file_grp`
490
        to the given :py:data:`output_file_grp`
491
        for the given :py:data:`page_id` (or all pages)
492
        under the given :py:data:`parameter`.
493
494
        (This contains the main functionality and needs to be
495
        overridden by subclasses.)
496
        """
497
        raise NotImplementedError()
498
499
    def process_workspace(self, workspace: Workspace) -> None:
500
        """
501
        Process all files of the given ``workspace``,
502
        from the given :py:data:`input_file_grp`
503
        to the given :py:data:`output_file_grp`
504
        for the given :py:data:`page_id` (or all pages)
505
        under the given :py:data:`parameter`.
506
507
        Delegates to :py:meth:`.process_workspace_submit_tasks`
508
        and :py:meth:`.process_workspace_handle_tasks`.
509
510
        (This will iterate over pages and files, calling
511
        :py:meth:`.process_page_file` and handling exceptions.
512
        It should be overridden by subclasses to handle cases
513
        like post-processing or computation across pages.)
514
        """
515
        with pushd_popd(workspace.directory):
516
            self.workspace = workspace
517
            self.verify()
518
            try:
519
                # set up multitasking
520
                max_workers = max(0, config.OCRD_MAX_PARALLEL_PAGES)
521
                if self.max_workers > 0 and self.max_workers < config.OCRD_MAX_PARALLEL_PAGES:
522
                    self._base_logger.info("limiting number of workers from %d to %d", max_workers, self.max_workers)
523
                    max_workers = self.max_workers
524
                if max_workers > 1:
525
                    assert isinstance(workspace.mets, ClientSideOcrdMets), \
526
                        "OCRD_MAX_PARALLEL_PAGES>1 requires also using --mets-server-url"
527
                max_seconds = max(0, config.OCRD_PROCESSING_PAGE_TIMEOUT)
528
                if self.max_page_seconds > 0 and self.max_page_seconds < config.OCRD_PROCESSING_PAGE_TIMEOUT:
529
                    self._base_logger.info("limiting page timeout from %d to %d sec", max_seconds, self.max_page_seconds)
530
                    max_seconds = self.max_page_seconds
531
532
                if isinstance(workspace.mets, ClientSideOcrdMets):
533
                    executor_cls = ProcessPoolExecutor
534
                    log_queue = mp.get_context('fork').Queue()
535
                else:
536
                    executor_cls = DummyExecutor
537
                    log_queue = None
538
                executor = executor_cls(
539
                    max_workers=max_workers or 1,
540
                    # only forking method avoids pickling
541
                    context=mp.get_context('fork'),
542
                    # share processor instance as global to avoid pickling
543
                    initializer=_page_worker_set_ctxt,
544
                    initargs=(self, log_queue),
545
                )
546
                if isinstance(workspace.mets, ClientSideOcrdMets):
547
                    assert executor.active # ensure pre-forking
548
                    # forward messages from log queue (in subprocesses) to all root handlers
549
                    log_listener = logging.handlers.QueueListener(log_queue, *logging.root.handlers,
550
                                                                  respect_handler_level=True)
551
                    log_listener.start()
552
                tasks = None
553
                try:
554
                    self._base_logger.debug("started executor %s with %d workers", str(executor), max_workers or 1)
555
                    tasks = self.process_workspace_submit_tasks(executor, max_seconds)
556
                    stats = self.process_workspace_handle_tasks(tasks)
557
                finally:
558
                    executor.stop()
559
                    executor.join(timeout=3.0) # raises TimeoutError
560
                    self._base_logger.debug("stopped executor %s after %d tasks", str(executor), len(tasks) if tasks else -1)
561
                    if isinstance(workspace.mets, ClientSideOcrdMets):
562
                        # can cause deadlock:
563
                        #log_listener.stop()
564
                        # not much better:
565
                        #log_listener.enqueue_sentinel()
566
                        pass
567
568
            except NotImplementedError:
569
                # fall back to deprecated method
570
                try:
571
                    self.process()
572
                except Exception as err:
573
                    # suppress the NotImplementedError context
574
                    raise err from None
575
576
    def process_workspace_submit_tasks(self, executor: TExecutor, max_seconds: int) -> Dict[
577
            TFuture, Tuple[str, List[Optional[OcrdFileType]]]]:
578
        """
579
        Look up all input files of the given ``workspace``
580
        from the given :py:data:`input_file_grp`
581
        for the given :py:data:`page_id` (or all pages),
582
        and schedules calling :py:meth:`.process_page_file`
583
        on them for each page via `executor` (enforcing
584
        a per-page time limit of `max_seconds`).
585
586
        When running with `OCRD_MAX_PARALLEL_PAGES>1` and
587
        the workspace via METS Server, the executor will fork
588
        this many worker parallel subprocesses each processing
589
        one page at a time. (Interprocess communication is
590
        done via task and result queues.)
591
592
        Otherwise, tasks are run sequentially in the
593
        current process.
594
595
        Delegates to :py:meth:`.zip_input_files` to get
596
        the input files for each page, and then calls
597
        :py:meth:`.process_workspace_submit_page_task`.
598
599
        Returns a dict mapping the per-page tasks
600
        (i.e. futures submitted to the executor)
601
        to their corresponding pageId and input files.
602
        """
603
        tasks = {}
604
        for input_file_tuple in self.zip_input_files(on_error='abort', require_first=False):
605
            task, page_id, input_files = self.process_workspace_submit_page_task(executor, max_seconds, input_file_tuple)
606
            tasks[task] = (page_id, input_files)
607
        self._base_logger.debug("submitted %d processing tasks", len(tasks))
608
        return tasks
609
610
    def process_workspace_submit_page_task(self, executor: TExecutor, max_seconds: int,
611
                                           input_file_tuple: List[Optional[OcrdFileType]]) -> Tuple[
612
                                               TFuture, str, List[Optional[OcrdFileType]]]:
613
        """
614
        Ensure all input files for a single page are
615
        downloaded to the workspace, then schedule
616
        :py:meth:`.process_page_file` to be run on
617
        them via `executor` (enforcing a per-page time
618
        limit of `max_seconds`).
619
620
        Delegates to :py:meth:`.process_page_file`
621
        (wrapped in :py:func:`_page_worker` to share
622
        the processor instance across forked processes).
623
624
        \b
625
        Returns a tuple of:
626
        - the scheduled future object,
627
        - the corresponding pageId,
628
        - the corresponding input files.
629
        """
630
        input_files: List[Optional[OcrdFileType]] = [None] * len(input_file_tuple)
631
        page_id = next(input_file.pageId
632
                       for input_file in input_file_tuple
633
                       if input_file)
634
        self._base_logger.info(f"preparing page {page_id}")
635
        for i, input_file in enumerate(input_file_tuple):
636
            if input_file is None:
637
                # file/page not found in this file grp
638
                continue
639
            input_files[i] = input_file
640
            if not self.download:
641
                continue
642
            try:
643
                input_files[i] = self.workspace.download_file(input_file)
644
            except (ValueError, FileNotFoundError, HTTPError) as e:
645
                self._base_logger.error(repr(e))
646
                self._base_logger.warning(f"failed downloading file {input_file} for page {page_id}")
647
        # process page
648
        #executor.submit(self.process_page_file, *input_files)
649
        return executor.submit(_page_worker, max_seconds, *input_files), page_id, input_files
650
651
    def process_workspace_handle_tasks(self, tasks: Dict[TFuture, Tuple[str, List[Optional[OcrdFileType]]]]) -> Tuple[
652
            int, int, Dict[str, int], int]:
653
        """
654
        Look up scheduled per-page futures one by one,
655
        handle errors (exceptions) and gather results.
656
657
        \b
658
        Enforces policies configured by the following
659
        environment variables:
660
        - `OCRD_EXISTING_OUTPUT` (abort/skip/overwrite)
661
        - `OCRD_MISSING_OUTPUT` (abort/skip/fallback-copy)
662
        - `OCRD_MAX_MISSING_OUTPUTS` (abort after all).
663
664
        \b
665
        Returns a tuple of:
666
        - the number of successfully processed pages
667
        - the number of failed (i.e. skipped or copied) pages
668
        - a dict of the type and corresponding number of exceptions seen
669
        - the number of total requested pages (i.e. success+fail+existing).
670
671
        Delegates to :py:meth:`.process_workspace_handle_page_task`
672
        for each page.
673
        """
674
        # aggregate info for logging:
675
        nr_succeeded = 0
676
        nr_failed = 0
677
        nr_errors = defaultdict(int)  # count causes
678
        if config.OCRD_MISSING_OUTPUT == 'SKIP':
679
            reason = "skipped"
680
        elif config.OCRD_MISSING_OUTPUT == 'COPY':
681
            reason = "fallback-copied"
682
        for task in tasks:
683
            # wait for results, handle errors
684
            page_id, input_files = tasks[task]
685
            result = self.process_workspace_handle_page_task(page_id, input_files, task)
686
            if isinstance(result, Exception):
687
                nr_errors[result.__class__.__name__] += 1
688
                nr_failed += 1
689
                # FIXME: this is just prospective, because len(tasks)==nr_failed+nr_succeeded is not guaranteed
690
                if config.OCRD_MAX_MISSING_OUTPUTS > 0 and nr_failed / len(tasks) > config.OCRD_MAX_MISSING_OUTPUTS:
691
                    # already irredeemably many failures, stop short
692
                    nr_errors = dict(nr_errors)
693
                    raise Exception(f"too many failures with {reason} output ({nr_failed} of {nr_failed+nr_succeeded}, "
694
                                    f"{str(nr_errors)})")
695
            elif result:
696
                nr_succeeded += 1
697
            # else skipped - already exists
698
        nr_errors = dict(nr_errors)
699
        nr_all = nr_succeeded + nr_failed
700
        if nr_failed > 0:
701
            if config.OCRD_MAX_MISSING_OUTPUTS > 0 and nr_failed / nr_all > config.OCRD_MAX_MISSING_OUTPUTS:
702
                raise Exception(f"too many failures with {reason} output ({nr_failed} of {nr_all}, {str(nr_errors)})")
703
            self._base_logger.warning("%s %d of %d pages due to %s", reason, nr_failed, nr_all, str(nr_errors))
704
        self._base_logger.debug("succeeded %d, missed %d of %d pages due to %s",
705
                                nr_succeeded, nr_failed, nr_all, str(nr_errors))
706
        return nr_succeeded, nr_failed, nr_errors, len(tasks)
707
708
    def process_workspace_handle_page_task(self, page_id: str, input_files: List[Optional[OcrdFileType]],
709
                                           task: TFuture) -> Union[bool, Exception]:
710
        """
711
        \b
712
        Await a single page result and handle errors (exceptions),
713
        enforcing policies configured by the following
714
        environment variables:
715
        - `OCRD_EXISTING_OUTPUT` (abort/skip/overwrite)
716
        - `OCRD_MISSING_OUTPUT` (abort/skip/fallback-copy)
717
        - `OCRD_MAX_MISSING_OUTPUTS` (abort after all).
718
719
        \b
720
        Returns
721
        - true in case of success
722
        - false in case the output already exists
723
        - the exception in case of failure
724
        """
725
        # FIXME: differentiate error cases in various ways:
726
        # - ResourceNotFoundError → use ResourceManager to download (once), then retry
727
        # - transient (I/O or OOM) error → maybe sleep, retry
728
        # - persistent (data) error → skip / dummy / raise
729
        try:
730
            self._base_logger.debug("waiting for output of task %s (page %s)", task, page_id)
731
            # timeout kwarg on future is useless: it only raises TimeoutError here,
732
            # but does not stop the running process/thread, and executor itself
733
            # offers nothing to that effect:
734
            # task.result(timeout=max_seconds or None)
735
            # so we instead passed the timeout to the submit (schedule) function
736
            task.result()
737
            self._base_logger.debug("page worker completed for page %s", page_id)
738
            return True
739
        except NotImplementedError:
740
            # exclude NotImplementedError, so we can try process() below
741
            raise
742
        # handle input failures separately
743
        except FileExistsError as err:
744
            if config.OCRD_EXISTING_OUTPUT == 'ABORT':
745
                raise err
746
            if config.OCRD_EXISTING_OUTPUT == 'SKIP':
747
                return False
748
            if config.OCRD_EXISTING_OUTPUT == 'OVERWRITE':
749
                # too late here, must not happen
750
                raise Exception(f"got {err} despite OCRD_EXISTING_OUTPUT==OVERWRITE")
751
        except KeyboardInterrupt:
752
            raise
753
        # broad coverage of output failures (including TimeoutError)
754
        except (Exception, TimeoutError) as err:
755
            if isinstance(err, TimeoutError):
756
                self._base_logger.debug("page worker timed out for page %s", page_id)
757
            # FIXME: add re-usable/actionable logging
758
            if config.OCRD_MISSING_OUTPUT == 'ABORT':
759
                self._base_logger.error(f"Failure on page {page_id}: {str(err) or err.__class__.__name__}")
760
                raise err
761
            self._base_logger.exception(f"Failure on page {page_id}: {str(err) or err.__class__.__name__}")
762
            if config.OCRD_MISSING_OUTPUT == 'SKIP':
763
                pass
764
            elif config.OCRD_MISSING_OUTPUT == 'COPY':
765
                self._copy_page_file(input_files[0])
766
            else:
767
                desc = config.describe('OCRD_MISSING_OUTPUT', wrap_text=False, indent_text=False)
768
                raise ValueError(f"unknown configuration value {config.OCRD_MISSING_OUTPUT} - {desc}")
769
            return err
770
771
    def _copy_page_file(self, input_file: OcrdFileType) -> None:
772
        """
773
        Copy the given ``input_file`` of the :py:data:`workspace`,
774
        representing one physical page (passed as one opened
775
        :py:class:`~ocrd_models.OcrdFile` per input fileGrp)
776
        and add it as if it was a processing result.
777
        """
778
        input_pcgts: OcrdPage
779
        assert isinstance(input_file, get_args(OcrdFileType))
780
        self._base_logger.debug(f"parsing file {input_file.ID} for page {input_file.pageId}")
781
        try:
782
            input_pcgts = page_from_file(input_file)
783
        except ValueError as err:
784
            # not PAGE and not an image to generate PAGE for
785
            self._base_logger.error(f"non-PAGE input for page {input_file.pageId}: {err}")
786
            return
787
        output_file_id = make_file_id(input_file, self.output_file_grp)
788
        input_pcgts.set_pcGtsId(output_file_id)
789
        self.add_metadata(input_pcgts)
790
        self.workspace.add_file(
791
            file_id=output_file_id,
792
            file_grp=self.output_file_grp,
793
            page_id=input_file.pageId,
794
            local_filename=os.path.join(self.output_file_grp, output_file_id + '.xml'),
795
            mimetype=MIMETYPE_PAGE,
796
            content=to_xml(input_pcgts),
797
        )
798
799
    def process_page_file(self, *input_files: Optional[OcrdFileType]) -> None:
800
        """
801
        Process the given ``input_files`` of the :py:data:`workspace`,
802
        representing one physical page (passed as one opened
803
        :py:class:`.OcrdFile` per input fileGrp)
804
        under the given :py:data:`.parameter`, and make sure the
805
        results get added accordingly.
806
807
        (This uses :py:meth:`.process_page_pcgts`, but should be overridden by subclasses
808
        to handle cases like multiple output fileGrps, non-PAGE input etc.)
809
        """
810
        input_pcgts: List[Optional[OcrdPage]] = [None] * len(input_files)
811
        input_pos = next(i for i, input_file in enumerate(input_files) if input_file is not None)
812
        page_id = input_files[input_pos].pageId
813
        self._base_logger.info("processing page %s", page_id)
814
        for i, input_file in enumerate(input_files):
815
            grp = self.input_file_grp.split(',')[i]
816
            if input_file is None:
817
                self._base_logger.debug(f"ignoring missing file for input fileGrp {grp} for page {page_id}")
818
                continue
819
            assert isinstance(input_file, get_args(OcrdFileType))
820
            if not input_file.local_filename:
821
                self._base_logger.error(f'No local file exists for page {page_id} in file group {grp}')
822
                if config.OCRD_MISSING_INPUT == 'ABORT':
823
                    raise MissingInputFile(grp, page_id, input_file.mimetype)
824
                continue
825
            self._base_logger.debug(f"parsing file {input_file.ID} for page {page_id}")
826
            try:
827
                page_ = page_from_file(input_file)
828
                assert isinstance(page_, OcrdPage)
829
                input_pcgts[i] = page_
830
            except ValueError as err:
831
                # not PAGE and not an image to generate PAGE for
832
                self._base_logger.error(f"non-PAGE input for page {page_id}: {err}")
833
        if not any(input_pcgts):
834
            self._base_logger.warning(f'skipping page {page_id}')
835
            return
836
        output_file_id = make_file_id(input_files[input_pos], self.output_file_grp)
837
        if input_files[input_pos].fileGrp == self.output_file_grp:
838
            # input=output fileGrp: re-use ID exactly
839
            output_file_id = input_files[input_pos].ID
840
        output_file = next(self.workspace.mets.find_files(ID=output_file_id), None)
841
        if output_file and config.OCRD_EXISTING_OUTPUT != 'OVERWRITE':
842
            # short-cut avoiding useless computation:
843
            raise FileExistsError(
844
                f"A file with ID=={output_file_id} already exists {output_file} and neither force nor ignore are set"
845
            )
846
        result = self.process_page_pcgts(*input_pcgts, page_id=page_id)
847
        for image_result in result.images:
848
            image_file_id = f'{output_file_id}_{image_result.file_id_suffix}'
849
            image_file_path = join(self.output_file_grp, f'{image_file_id}.png')
850
            if isinstance(image_result.alternative_image, PageType):
851
                # special case: not an alternative image, but replacing the original image
852
                # (this is needed by certain processors when the original's coordinate system
853
                #  cannot or must not be kept)
854
                image_result.alternative_image.set_imageFilename(image_file_path)
855
                image_result.alternative_image.set_imageWidth(image_result.pil.width)
856
                image_result.alternative_image.set_imageHeight(image_result.pil.height)
857
            elif isinstance(image_result.alternative_image, AlternativeImageType):
858
                image_result.alternative_image.set_filename(image_file_path)
859
            elif image_result.alternative_image is None:
860
                pass  # do not reference in PAGE result
861
            else:
862
                raise ValueError(f"process_page_pcgts returned an OcrdPageResultImage of unknown type "
863
                                 f"{type(image_result.alternative_image)}")
864
            self.workspace.save_image_file(
865
                image_result.pil,
866
                image_file_id,
867
                self.output_file_grp,
868
                page_id=page_id,
869
                file_path=image_file_path,
870
            )
871
        result.pcgts.set_pcGtsId(output_file_id)
872
        self.add_metadata(result.pcgts)
873
        self.workspace.add_file(
874
            file_id=output_file_id,
875
            file_grp=self.output_file_grp,
876
            page_id=page_id,
877
            local_filename=os.path.join(self.output_file_grp, output_file_id + '.xml'),
878
            mimetype=MIMETYPE_PAGE,
879
            content=to_xml(result.pcgts),
880
        )
881
882
    def process_page_pcgts(self, *input_pcgts: Optional[OcrdPage], page_id: Optional[str] = None) -> OcrdPageResult:
883
        """
884
        Process the given ``input_pcgts`` of the :py:data:`.workspace`,
885
        representing one physical page (passed as one parsed
886
        :py:class:`.OcrdPage` per input fileGrp)
887
        under the given :py:data:`.parameter`, and return the
888
        resulting :py:class:`.OcrdPageResult`.
889
890
        Optionally, add to the ``images`` attribute of the resulting
891
        :py:class:`.OcrdPageResult` instances of :py:class:`.OcrdPageResultImage`,
892
        which have required fields for ``pil`` (:py:class:`PIL.Image` image data),
893
        ``file_id_suffix`` (used for generating IDs of the saved image) and
894
        ``alternative_image`` (reference of the :py:class:`ocrd_models.ocrd_page.AlternativeImageType`
895
        for setting the filename of the saved image).
896
897
        (This contains the main functionality and must be overridden by subclasses,
898
        unless it does not get called by some overriden :py:meth:`.process_page_file`.)
899
        """
900
        raise NotImplementedError()
901
902
    def add_metadata(self, pcgts: OcrdPage) -> None:
903
        """
904
        Add PAGE-XML :py:class:`~ocrd_models.ocrd_page.MetadataItemType` ``MetadataItem`` describing
905
        the processing step and runtime parameters to :py:class:`.OcrdPage` ``pcgts``.
906
        """
907
        metadata_obj = pcgts.get_Metadata()
908
        assert metadata_obj is not None
909
        metadata_item = MetadataItemType(
910
            type_="processingStep",
911
            name=self.ocrd_tool['steps'][0],
912
            value=self.ocrd_tool['executable'],
913
            Labels=[LabelsType(
914
                externalModel="ocrd-tool",
915
                externalId="parameters",
916
                Label=[LabelType(type_=name,
917
                                 value=self.parameter[name])
918
                       for name in self.parameter.keys()]),
919
                    LabelsType(
920
                        externalModel="ocrd-tool",
921
                        externalId="version",
922
                        Label=[LabelType(type_=self.ocrd_tool['executable'],
923
                                         value=self.version),
924
                               LabelType(type_='ocrd/core',
925
                                         value=OCRD_VERSION)])
926
            ])
927
        metadata_obj.add_MetadataItem(metadata_item)
928
929
    def resolve_resource(self, val):
930
        """
931
        Resolve a resource name to an absolute file path with the algorithm in
932
        `spec <https://ocr-d.de/en/spec/ocrd_tool#file-parameters>`_
933
934
        Args:
935
            val (string): resource value to resolve
936
        """
937
        executable = self.ocrd_tool['executable']
938
        if exists(val):
939
            self._base_logger.debug("Resolved to absolute path %s" % val)
940
            return val
941
        # FIXME: remove once workspace arg / old_pwd is gone:
942
        if hasattr(self, 'old_pwd'):
943
            cwd = self.old_pwd
944
        else:
945
            cwd = getcwd()
946
        ret = list(filter(exists, list_resource_candidates(executable, val,
947
                                                           cwd=cwd, moduled=self.moduledir)))
948
        if ret:
949
            self._base_logger.debug("Resolved %s to absolute path %s" % (val, ret[0]))
950
            return ret[0]
951
        raise ResourceNotFoundError(val, executable)
952
953
    def show_resource(self, val):
954
        """
955
        Resolve a resource name to a file path with the algorithm in
956
        `spec <https://ocr-d.de/en/spec/ocrd_tool#file-parameters>`_,
957
        then print its contents to stdout.
958
959
        Args:
960
            val (string): resource value to show
961
        """
962
        res_fname = self.resolve_resource(val)
963
        fpath = Path(res_fname)
964
        if fpath.is_dir():
965
            with pushd_popd(fpath):
966
                fileobj = io.BytesIO()
967
                with tarfile.open(fileobj=fileobj, mode='w:gz') as tarball:
968
                    tarball.add('.')
969
                fileobj.seek(0)
970
                copyfileobj(fileobj, sys.stdout.buffer)
971
        else:
972
            sys.stdout.buffer.write(fpath.read_bytes())
973
974
    def list_all_resources(self):
975
        """
976
        List all resources found in the filesystem and matching content-type by filename suffix
977
        """
978
        for res in list_all_resources(self.executable, ocrd_tool=self.ocrd_tool, moduled=self.moduledir):
979
            res = Path(res)
980
            yield res.name
981
982
    @property
983
    def module(self):
984
        """
985
        The top-level module this processor belongs to.
986
        """
987
        # find shortest prefix path that is not just a namespace package
988
        fqname = ''
989
        for name in self.__module__.split('.'):
990
            if fqname:
991
                fqname += '.'
992
            fqname += name
993
            if getattr(sys.modules[fqname], '__file__', None):
994
                return fqname
995
        # fall-back
996
        return self.__module__
997
998
    @property
999
    def moduledir(self):
1000
        """
1001
        The filesystem path of the module directory.
1002
        """
1003
        return resource_filename(self.module, '.')
1004
1005
    @property
1006
    def input_files(self):
1007
        """
1008
        List the input files (for single-valued :py:attr:`input_file_grp`).
1009
1010
        For each physical page:
1011
1012
        - If there is a single PAGE-XML for the page, take it (and forget about all
1013
          other files for that page)
1014
        - Else if there is a single image file, take it (and forget about all other
1015
          files for that page)
1016
        - Otherwise raise an error (complaining that only PAGE-XML warrants
1017
          having multiple images for a single page)
1018
1019
        See `algorithm <https://github.com/cisocrgroup/ocrd_cis/pull/57#issuecomment-656336593>`_
1020
1021
        Returns:
1022
            A list of :py:class:`ocrd_models.ocrd_file.OcrdFile` objects.
1023
        """
1024
        if not self.input_file_grp:
1025
            raise ValueError("Processor is missing input fileGrp")
1026
        ret = self.zip_input_files(mimetype=None, on_error='abort')
1027
        if not ret:
1028
            return []
1029
        assert len(ret[0]) == 1, 'Use zip_input_files() instead of input_files when processing multiple input fileGrps'
1030
        return [tuples[0] for tuples in ret]
1031
1032
    def zip_input_files(self, require_first=True, mimetype=None, on_error='skip'):
1033
        """
1034
        List tuples of input files (for multi-valued :py:attr:`input_file_grp`).
1035
1036
        Processors that expect/need multiple input file groups,
1037
        cannot use :py:data:`input_files`. They must align (zip) input files
1038
        across pages. This includes the case where not all pages
1039
        are equally present in all file groups. It also requires
1040
        making a consistent selection if there are multiple files
1041
        per page.
1042
1043
        Following the OCR-D functional model, this function tries to
1044
        find a single PAGE file per page, or fall back to a single
1045
        image file per page. In either case, multiple matches per page
1046
        are an error (see error handling below).
1047
        This default behaviour can be changed by using a fixed MIME
1048
        type filter via :py:attr:`mimetype`. But still, multiple matching
1049
        files per page are an error.
1050
1051
        Single-page multiple-file errors are handled according to
1052
        :py:attr:`on_error`:
1053
1054
        - if ``skip``, then the page for the respective fileGrp will be
1055
          silently skipped (as if there was no match at all)
1056
        - if ``first``, then the first matching file for the page will be
1057
          silently selected (as if the first was the only match)
1058
        - if ``last``, then the last matching file for the page will be
1059
          silently selected (as if the last was the only match)
1060
        - if ``abort``, then an exception will be raised.
1061
1062
        Multiple matches for PAGE-XML will always raise an exception.
1063
1064
        Keyword Args:
1065
             require_first (boolean): If true, then skip a page entirely
1066
                 whenever it is not available in the first input `fileGrp`.
1067
             on_error (string): How to handle multiple file matches per page.
1068
             mimetype (string): If not `None`, filter by the specified MIME
1069
                 type (literal or regex prefixed by `//`). Otherwise prefer
1070
                 PAGE or image.
1071
        Returns:
1072
            A list of :py:class:`ocrd_models.ocrd_file.OcrdFile` tuples.
1073
        """
1074
        if not self.input_file_grp:
1075
            raise ValueError("Processor is missing input fileGrp")
1076
1077
        ifgs = self.input_file_grp.split(",")
1078
        # Iterating over all files repeatedly may seem inefficient at first sight,
1079
        # but the unnecessary OcrdFile instantiations for posterior fileGrp filtering
1080
        # can actually be much more costly than traversing the ltree.
1081
        # This might depend on the number of pages vs number of fileGrps.
1082
1083
        pages = {}
1084
        for i, ifg in enumerate(ifgs):
1085
            files_ = sorted(self.workspace.mets.find_all_files(
1086
                    pageId=self.page_id, fileGrp=ifg, mimetype=mimetype),
1087
                                # sort by MIME type so PAGE comes before images
1088
                                key=lambda file_: file_.mimetype)
1089
            for file_ in files_:
1090
                if not file_.pageId:
1091
                    # ignore document-global files
1092
                    continue
1093
                ift = pages.setdefault(file_.pageId, [None]*len(ifgs))
1094
                if ift[i]:
1095
                    self._base_logger.debug(
1096
                        f"another file {file_.ID} for page {file_.pageId} in input file group {ifg}")
1097
                    # fileGrp has multiple files for this page ID
1098
                    if mimetype:
1099
                        # filter was active, this must not happen
1100
                        self._base_logger.warning(
1101
                            f"added file {file_.ID} for page {file_.pageId} in input file group {ifg} "
1102
                            f"conflicts with file {ift[i].ID} of same MIME type {mimetype} - on_error={on_error}")
1103 View Code Duplication
                        if on_error == 'skip':
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1104
                            ift[i] = None
1105
                        elif on_error == 'first':
1106
                            pass  # keep first match
1107
                        elif on_error == 'last':
1108
                            ift[i] = file_
1109
                        elif on_error == 'abort':
1110
                            raise NonUniqueInputFile(ifg, file_.pageId, mimetype)
1111
                        else:
1112
                            raise Exception("Unknown 'on_error' strategy '%s'" % on_error)
1113
                    elif (ift[i].mimetype == MIMETYPE_PAGE and
1114
                          file_.mimetype != MIMETYPE_PAGE):
1115
                        pass  # keep PAGE match
1116
                    elif (ift[i].mimetype == MIMETYPE_PAGE and
1117
                          file_.mimetype == MIMETYPE_PAGE):
1118
                        raise NonUniqueInputFile(ifg, file_.pageId, None)
1119
                    else:
1120
                        # filter was inactive but no PAGE is in control, this must not happen
1121
                        self._base_logger.warning(
1122
                            f"added file {file_.ID} for page {file_.pageId} in input file group {ifg} "
1123
                            f"conflicts with file {ift[i].ID} but no PAGE available - on_error={on_error}")
1124 View Code Duplication
                        if on_error == 'skip':
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1125
                            ift[i] = None
1126
                        elif on_error == 'first':
1127
                            pass  # keep first match
1128
                        elif on_error == 'last':
1129
                            ift[i] = file_
1130
                        elif on_error == 'abort':
1131
                            raise NonUniqueInputFile(ifg, file_.pageId, None)
1132
                        else:
1133
                            raise Exception("Unknown 'on_error' strategy '%s'" % on_error)
1134
                else:
1135
                    self._base_logger.debug(f"adding file {file_.ID} for page {file_.pageId} to input file group {ifg}")
1136
                    ift[i] = file_
1137
        # Warn if no files found but pageId was specified, because that might be due to invalid page_id (range)
1138
        if self.page_id and not any(pages):
1139
            self._base_logger.critical(f"Could not find any files for selected pageId {self.page_id}.\n"
1140
                                       f"compare '{self.page_id}' with the output of 'orcd workspace list-page'.")
1141
        ifts = []
1142
        # use physical page order
1143
        for page in self.workspace.mets.physical_pages:
1144
            if page not in pages:
1145
                continue
1146
            ifiles = pages[page]
1147
            for i, ifg in enumerate(ifgs):
1148
                if not ifiles[i]:
1149
                    # could be from non-unique with on_error=skip or from true gap
1150
                    self._base_logger.error(f'Found no file for page {page} in file group {ifg}')
1151
                    if config.OCRD_MISSING_INPUT == 'ABORT':
1152
                        raise MissingInputFile(ifg, page, mimetype)
1153
            if not any(ifiles):
1154
                # must be from non-unique with on_error=skip
1155
                self._base_logger.warning(f'Found no files for {page} - skipping')
1156
                continue
1157
            if ifiles[0] or not require_first:
1158
                ifts.append(tuple(ifiles))
1159
        return ifts
1160
1161
1162
_page_worker_processor = None
1163
"""
1164
This global binding for the processor is required to avoid
1165
squeezing the processor through a mp.Queue (which is impossible
1166
due to unpicklable attributes like .workspace.mets._tree anyway)
1167
when calling Processor.process_page_file as page worker processes
1168
in Processor.process_workspace. Forking allows inheriting global
1169
objects, and with the METS Server we do not mutate the local
1170
processor instance anyway.
1171
"""
1172
1173
1174
def _page_worker_set_ctxt(processor, log_queue):
1175
    """
1176
    Overwrites `ocrd.processor.base._page_worker_processor` instance
1177
    for sharing with subprocesses in ProcessPoolExecutor initializer.
1178
    """
1179
    global _page_worker_processor
1180
    _page_worker_processor = processor
1181
    if log_queue:
1182
        # replace all log handlers with just one queue handler
1183
        logging.root.handlers = [logging.handlers.QueueHandler(log_queue)]
1184
1185
1186
def _page_worker(*input_files):
1187
    """
1188
    Wraps a `Processor.process_page_file` call as payload (call target)
1189
    of the ProcessPoolExecutor workers.
1190
    """
1191
    _page_worker_processor.process_page_file(*input_files)
1192
1193
1194
def generate_processor_help(ocrd_tool, processor_instance=None, subcommand=None):
1195
    """Generate a string describing the full CLI of this processor including params.
1196
1197
    Args:
1198
         ocrd_tool (dict): this processor's ``tools`` section of the module's ``ocrd-tool.json``
1199
         processor_instance (object, optional): the processor implementation
1200
             (for adding any module/class/function docstrings)
1201
        subcommand (string, optional): 'worker'
1202
    """
1203
    doc_help = ''
1204
    if processor_instance:
1205
        module = inspect.getmodule(processor_instance)
1206
        if module and module.__doc__:
1207
            doc_help += '\n' + inspect.cleandoc(module.__doc__) + '\n'
1208
        if processor_instance.__doc__:
1209
            doc_help += '\n' + inspect.cleandoc(processor_instance.__doc__) + '\n'
1210
        # Try to find the most concrete docstring among the various methods that an implementation
1211
        # could overload, first serving.
1212
        # In doing so, compare with Processor to avoid a glitch in the way py>=3.5 inherits docstrings.
1213
        # (They are supposed to only repeat information inspect.getdoc, rather than inherit __doc__ itself.)
1214
        for method in ['process_page_pcgts', 'process_page_file', 'process_workspace', 'process']:
1215
            instance_method = getattr(processor_instance, method)
1216
            superclass_method = getattr(Processor, method)
1217
            if instance_method.__doc__ and instance_method.__doc__ != superclass_method.__doc__:
1218
                doc_help += '\n' + inspect.cleandoc(instance_method.__doc__) + '\n'
1219
                break
1220
        if doc_help:
1221
            doc_help = '\n\n' + wrap_text(doc_help, width=72,
1222
                                          initial_indent='  > ',
1223
                                          subsequent_indent='  > ',
1224
                                          preserve_paragraphs=True)
1225
    subcommands = '''\
1226
    worker      Start a processing worker rather than do local processing
1227
'''
1228
1229
    processing_worker_options = '''\
1230
  --queue                         The RabbitMQ server address in format
1231
                                  "amqp://{user}:{pass}@{host}:{port}/{vhost}"
1232
                                  [amqp://admin:admin@localhost:5672]
1233
  --database                      The MongoDB server address in format
1234
                                  "mongodb://{host}:{port}"
1235
                                  [mongodb://localhost:27018]
1236
  --log-filename                  Filename to redirect STDOUT/STDERR to,
1237
                                  if specified.
1238
'''
1239
1240
    processing_server_options = '''\
1241
  --database                      The MongoDB server address in format
1242
                                  "mongodb://{host}:{port}"
1243
                                  [mongodb://localhost:27018]
1244
'''
1245
1246
    processing_options = '''\
1247
  -m, --mets URL-PATH             URL or file path of METS to process [./mets.xml]
1248
  -w, --working-dir PATH          Working directory of local workspace [dirname(URL-PATH)]
1249
  -I, --input-file-grp USE        File group(s) used as input
1250
  -O, --output-file-grp USE       File group(s) used as output
1251
  -g, --page-id ID                Physical page ID(s) to process instead of full document []
1252
  --overwrite                     Remove existing output pages/images
1253
                                  (with "--page-id", remove only those).
1254
                                  Short-hand for OCRD_EXISTING_OUTPUT=OVERWRITE
1255
  --debug                         Abort on any errors with full stack trace.
1256
                                  Short-hand for OCRD_MISSING_OUTPUT=ABORT
1257
  --profile                       Enable profiling
1258
  --profile-file PROF-PATH        Write cProfile stats to PROF-PATH. Implies "--profile"
1259
  -p, --parameter JSON-PATH       Parameters, either verbatim JSON string
1260
                                  or JSON file path
1261
  -P, --param-override KEY VAL    Override a single JSON object key-value pair,
1262
                                  taking precedence over --parameter
1263
  -U, --mets-server-url URL       URL of a METS Server for parallel incremental access to METS
1264
                                  If URL starts with http:// start an HTTP server there,
1265
                                  otherwise URL is a path to an on-demand-created unix socket
1266
  -l, --log-level [OFF|ERROR|WARN|INFO|DEBUG|TRACE]
1267
                                  Override log level globally [INFO]
1268
  --log-filename LOG-PATH         File to redirect stderr logging to (overriding ocrd_logging.conf).
1269
'''
1270
1271
    information_options = '''\
1272
  -C, --show-resource RESNAME     Dump the content of processor resource RESNAME
1273
  -L, --list-resources            List names of processor resources
1274
  -J, --dump-json                 Dump tool description as JSON
1275
  -D, --dump-module-dir           Show the 'module' resource location path for this processor
1276
  -h, --help                      Show this message
1277
  -V, --version                   Show version
1278
'''
1279
1280
    parameter_help = ''
1281
    if 'parameters' not in ocrd_tool or not ocrd_tool['parameters']:
1282
        parameter_help = '  NONE\n'
1283
    else:
1284
        def wrap(s):
1285
            return wrap_text(s, initial_indent=' ' * 3,
1286
                             subsequent_indent=' ' * 4,
1287
                             width=72, preserve_paragraphs=True)
1288
        for param_name, param in ocrd_tool['parameters'].items():
1289
            parameter_help += wrap('"%s" [%s%s]' % (
1290
                param_name,
1291
                param['type'],
1292
                ' - REQUIRED' if 'required' in param and param['required'] else
1293
                ' - %s' % json.dumps(param['default']) if 'default' in param else ''))
1294
            parameter_help += '\n ' + wrap(param['description'])
1295
            if 'enum' in param:
1296
                parameter_help += '\n ' + wrap('Possible values: %s' % json.dumps(param['enum']))
1297
            parameter_help += "\n"
1298
1299
    if not subcommand:
1300
        return f'''\
1301
Usage: {ocrd_tool['executable']} [worker|server] [OPTIONS]
1302
1303
  {ocrd_tool['description']}{doc_help}
1304
1305
Subcommands:
1306
{subcommands}
1307
Options for processing:
1308
{processing_options}
1309
Options for information:
1310
{information_options}
1311
Parameters:
1312
{parameter_help}
1313
'''
1314
    elif subcommand == 'worker':
1315
        return f'''\
1316
Usage: {ocrd_tool['executable']} worker [OPTIONS]
1317
1318
  Run {ocrd_tool['executable']} as a processing worker.
1319
1320
  {ocrd_tool['description']}{doc_help}
1321
1322
Options:
1323
{processing_worker_options}
1324
'''
1325
    else:
1326
        pass
1327