Passed
Push — master ( 71c0c1...5d69e4 )
by Konstantin
02:52
created

ocrd.processor.base.Processor.verify()   C

Complexity

Conditions 9

Size

Total Lines 40
Code Lines 32

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 32
dl 0
loc 40
rs 6.6666
c 0
b 0
f 0
cc 9
nop 1
1
"""
2
Processor base class and helper functions.
3
"""
4
5
__all__ = [
6
    'Processor',
7
    'generate_processor_help',
8
    'run_cli',
9
    'run_processor'
10
]
11
12
from functools import cached_property
13
from os.path import exists, join
14
from shutil import copyfileobj
15
import json
16
import os
17
from os import getcwd
18
from pathlib import Path
19
from typing import Any, Dict, List, Optional, Tuple, Union, get_args
20
import sys
21
import logging
22
import logging.handlers
23
import inspect
24
import tarfile
25
import io
26
from collections import defaultdict
27
from frozendict import frozendict
28
# concurrent.futures is buggy in py38,
29
# this is where the fixes came from:
30
from loky import Future, ProcessPoolExecutor
31
import multiprocessing as mp
32
from threading import Timer
33
from _thread import interrupt_main
34
35
from click import wrap_text
36
from deprecated import deprecated
37
from requests import HTTPError
38
39
from ..workspace import Workspace
40
from ..mets_server import ClientSideOcrdMets
41
from ocrd_models.ocrd_file import OcrdFileType
42
from .ocrd_page_result import OcrdPageResult
43
from ocrd_utils import (
44
    VERSION as OCRD_VERSION,
45
    MIMETYPE_PAGE,
46
    MIME_TO_EXT,
47
    config,
48
    getLogger,
49
    list_resource_candidates,
50
    pushd_popd,
51
    list_all_resources,
52
    get_processor_resource_types,
53
    resource_filename,
54
    parse_json_file_with_comments,
55
    make_file_id,
56
    deprecation_warning
57
)
58
from ocrd_validators import ParameterValidator
59
from ocrd_models.ocrd_page import (
60
    PageType,
61
    AlternativeImageType,
62
    MetadataItemType,
63
    LabelType,
64
    LabelsType,
65
    OcrdPage,
66
    to_xml,
67
)
68
from ocrd_modelfactory import page_from_file
69
from ocrd_validators.ocrd_tool_validator import OcrdToolValidator
70
71
# XXX imports must remain for backwards-compatibility
72
from .helpers import run_cli, run_processor # pylint: disable=unused-import
73
74
75
class ResourceNotFoundError(FileNotFoundError):
76
    """
77
    An exception signifying the requested processor resource
78
    cannot be resolved.
79
    """
80
    def __init__(self, name, executable):
81
        self.name = name
82
        self.executable = executable
83
        self.message = (f"Could not find resource '{name}' for executable '{executable}'. "
84
                        f"Try 'ocrd resmgr download {executable} {name}' to download this resource.")
85
        super().__init__(self.message)
86
87
class NonUniqueInputFile(ValueError):
88
    """
89
    An exception signifying the specified fileGrp / pageId / mimetype
90
    selector yields multiple PAGE files, or no PAGE files but multiple images,
91
    or multiple files of that mimetype.
92
    """
93
    def __init__(self, fileGrp, pageId, mimetype):
94
        self.fileGrp = fileGrp
95
        self.pageId = pageId
96
        self.mimetype = mimetype
97
        self.message = (f"Could not determine unique input file for fileGrp {fileGrp} "
98
                        f"and pageId {pageId} under mimetype {mimetype or 'PAGE+image(s)'}")
99
        super().__init__(self.message)
100
101
class MissingInputFile(ValueError):
102
    """
103
    An exception signifying the specified fileGrp / pageId / mimetype
104
    selector yields no PAGE files, or no PAGE and no image files,
105
    or no files of that mimetype.
106
    """
107
    def __init__(self, fileGrp, pageId, mimetype):
108
        self.fileGrp = fileGrp
109
        self.pageId = pageId
110
        self.mimetype = mimetype
111
        self.message = (f"Could not find input file for fileGrp {fileGrp} "
112
                        f"and pageId {pageId} under mimetype {mimetype or 'PAGE+image(s)'}")
113
        super().__init__(self.message)
114
115
class DummyFuture:
116
    """
117
    Mimics some of `concurrent.futures.Future` but runs immediately.
118
    """
119
    def __init__(self, fn, *args, **kwargs):
120
        self.fn = fn
121
        self.args = args
122
        self.kwargs = kwargs
123
    def result(self):
124
        return self.fn(*self.args, **self.kwargs)
125
class DummyExecutor:
126
    """
127
    Mimics some of `concurrent.futures.ProcessPoolExecutor` but runs
128
    everything immediately in this process.
129
    """
130
    def __init__(self, initializer=None, initargs=(), **kwargs):
131
        initializer(*initargs)
132
    def shutdown(self, **kwargs):
133
        pass
134
    def submit(self, fn, *args, **kwargs) -> DummyFuture:
135
        return DummyFuture(fn, *args, **kwargs)
136
137
TFuture = Union[DummyFuture, Future]
138
TExecutor = Union[DummyExecutor, ProcessPoolExecutor]
139
140
class Processor():
141
    """
142
    A processor is a tool that implements the uniform OCR-D
143
    `command-line interface for run-time data processing <https://ocr-d.de/en/spec/cli>`_.
144
145
    That is, it executes a single workflow step, or a combination of workflow steps,
146
    on the workspace (represented by local METS). It reads input files for all or selected
147
    physical pages of the input fileGrp(s), computes additional annotation, and writes output
148
    files for them into the output fileGrp(s). It may take a number of optional or mandatory
149
    parameters.
150
    """
151
152
    max_instances : int = -1
153
    """
154
    maximum number of cached instances (ignored if negative), to be applied on top of
155
    :py:data:`~ocrd_utils.config.OCRD_MAX_PROCESSOR_CACHE` (i.e. whatever is smaller).
156
157
    (Override this if you know how many instances fit into memory - GPU / CPU RAM - at once.)
158
    """
159
160
    max_workers : int = -1
161
    """
162
    maximum number of processor forks for page-parallel processing (ignored if negative),
163
    to be applied on top of :py:data:`~ocrd_utils.config.OCRD_MAX_PARALLEL_PAGES` (i.e.
164
    whatever is smaller).
165
166
    (Override this if you know how many pages fit into processing units - GPU shaders / CPU cores
167
    - at once, or if your class already creates threads prior to forking, e.g. during ``setup``.)
168
    """
169
170
    max_page_seconds : int = -1
171
    """
172
    maximum number of seconds may be spent processing a single page (ignored if negative),
173
    to be applied on top of :py:data:`~ocrd_utils.config.OCRD_PROCESSING_PAGE_TIMEOUT`
174
    (i.e. whatever is smaller).
175
176
    (Override this if you know how costly this processor may be, irrespective of image size
177
    or complexity of the page.)
178
    """
179
180
    @property
181
    def metadata_filename(self) -> str:
182
        """
183
        Relative location of the ``ocrd-tool.json`` file inside the package.
184
185
        Used by :py:data:`metadata_location`.
186
187
        (Override if ``ocrd-tool.json`` is not in the root of the module,
188
        e.g. ``namespace/ocrd-tool.json`` or ``data/ocrd-tool.json``).
189
        """
190
        return 'ocrd-tool.json'
191
192
    @cached_property
193
    def metadata_location(self) -> Path:
194
        """
195
        Absolute path of the ``ocrd-tool.json`` file as distributed with the package.
196
197
        Used by :py:data:`metadata_rawdict`.
198
199
        (Override if ``ocrd-tool.json`` is not distributed with the Python package.)
200
        """
201
        module = inspect.getmodule(self)
202
        module_tokens = module.__package__.split('.')
203
        # for namespace packages, we cannot just use the first token
204
        for i in range(len(module_tokens)):
205
            prefix = '.'.join(module_tokens[:i + 1])
206
            if sys.modules[prefix].__spec__.has_location:
207
                return resource_filename(prefix, self.metadata_filename)
208
        raise Exception("cannot find top-level module prefix for %s", module.__package__)
209
210
    @cached_property
211
    def metadata_rawdict(self) -> dict:
212
        """
213
        Raw (unvalidated, unexpanded) ``ocrd-tool.json`` dict contents of the package.
214
215
        Used by :py:data:`metadata`.
216
217
        (Override if ``ocrd-tool.json`` is not in a file.)
218
        """
219
        return parse_json_file_with_comments(self.metadata_location)
220
221
    @cached_property
222
    def metadata(self) -> dict:
223
        """
224
        The ``ocrd-tool.json`` dict contents of the package, according to the OCR-D
225
        `spec <https://ocr-d.de/en/spec/ocrd_tool>`_ for processor tools.
226
227
        After deserialisation, it also gets validated against the
228
        `schema <https://ocr-d.de/en/spec/ocrd_tool#definition>`_ with all defaults
229
        expanded.
230
231
        Used by :py:data:`ocrd_tool` and :py:data:`version`.
232
233
        (Override if you want to provide metadata programmatically instead of a
234
        JSON file.)
235
        """
236
        metadata = self.metadata_rawdict
237
        report = OcrdToolValidator.validate(metadata)
238
        if not report.is_valid:
239
            self.logger.error(f"The ocrd-tool.json of this processor is {'problematic' if not report.errors else 'invalid'}:\n"
240
                              f"{report.to_xml()}.\nPlease open an issue at {metadata.get('git_url', 'the website')}.")
241
        return metadata
242
243
    @cached_property
244
    def version(self) -> str:
245
        """
246
        The program version of the package.
247
        Usually the ``version`` part of :py:data:`metadata`.
248
249
        (Override if you do not want to use :py:data:`metadata` lookup
250
        mechanism.)
251
        """
252
        return self.metadata['version']
253
254
    @cached_property
255
    def executable(self) -> str:
256
        """
257
        The executable name of this processor tool. Taken from the runtime
258
        filename.
259
260
        Used by :py:data:`ocrd_tool` for lookup in :py:data:`metadata`.
261
262
        (Override if your entry-point name deviates from the ``executable``
263
        name, or the processor gets instantiated from another runtime.)
264
        """
265
        return os.path.basename(inspect.stack()[-1].filename)
266
267
    @cached_property
268
    def ocrd_tool(self) -> dict:
269
        """
270
        The ``ocrd-tool.json`` dict contents of this processor tool.
271
        Usually the :py:data:`executable` key of the ``tools`` part
272
        of :py:data:`metadata`.
273
274
        (Override if you do not want to use :py:data:`metadata` lookup
275
        mechanism.)
276
        """
277
        return self.metadata['tools'][self.executable]
278
279
    @property
280
    def parameter(self) -> Optional[dict]:
281
        """the runtime parameter dict to be used by this processor"""
282
        if hasattr(self, '_parameter'):
283
            return self._parameter
284
        return None
285
286
    @parameter.setter
287
    def parameter(self, parameter : dict) -> None:
288
        if self.parameter is not None:
289
            self.shutdown()
290
        parameterValidator = ParameterValidator(self.ocrd_tool)
291
        report = parameterValidator.validate(parameter)
292
        if not report.is_valid:
293
            raise ValueError(f'Invalid parameters:\n{report.to_xml()}')
294
        # make parameter dict read-only
295
        self._parameter = frozendict(parameter)
296
        # (re-)run setup to load models etc
297
        self.setup()
298
299
    def __init__(
300
            self,
301
            # FIXME: remove in favor of process_workspace(workspace)
302
            workspace : Optional[Workspace],
303
            ocrd_tool=None,
304
            parameter=None,
305
            input_file_grp=None,
306
            output_file_grp=None,
307
            page_id=None,
308
            download_files=config.OCRD_DOWNLOAD_INPUT,
309
            version=None
310
    ):
311
        """
312
        Instantiate, but do not setup (neither for processing nor other usage).
313
        If given, do parse and validate :py:data:`.parameter`.
314
315
        Args:
316
             workspace (:py:class:`~ocrd.Workspace`): The workspace to process. \
317
                 If not ``None``, then `chdir` to that directory.
318
                 Deprecated since version 3.0: Should be ``None`` here, but then needs to be set \
319
                 before processing.
320
        Keyword Args:
321
             parameter (string): JSON of the runtime choices for ocrd-tool ``parameters``. \
322
                 Can be ``None`` even for processing, but then needs to be set before running.
323
             input_file_grp (string): comma-separated list of METS ``fileGrp`` used for input. \
324
                 Deprecated since version 3.0: Should be ``None`` here, but then needs to be set \
325
                 before processing.
326
             output_file_grp (string): comma-separated list of METS ``fileGrp`` used for output. \
327
                 Deprecated since version 3.0: Should be ``None`` here, but then needs to be set \
328
                 before processing.
329
             page_id (string): comma-separated list of METS physical ``page`` IDs to process \
330
                 (or empty for all pages). \
331
                 Deprecated since version 3.0: Should be ``None`` here, but then needs to be set \
332
                 before processing.
333
             download_files (boolean): Whether input files will be downloaded prior to processing, \
334
                 defaults to :py:attr:`ocrd_utils.config.OCRD_DOWNLOAD_INPUT` which is ``True`` by default
335
        """
336
        if ocrd_tool is not None:
337
            deprecation_warning("Passing 'ocrd_tool' as keyword argument to Processor is deprecated - "
338
                                "use or override metadata/executable/ocrd-tool properties instead")
339
            self.ocrd_tool = ocrd_tool
340
            self.executable = ocrd_tool['executable']
341
        if version is not None:
342
            deprecation_warning("Passing 'version' as keyword argument to Processor is deprecated - "
343
                                "use or override metadata/version properties instead")
344
            self.version = version
345
        if workspace is not None:
346
            deprecation_warning("Passing a workspace argument other than 'None' to Processor "
347
                                "is deprecated - pass as argument to process_workspace instead")
348
            self.workspace = workspace
349
            self.old_pwd = getcwd()
350
            os.chdir(self.workspace.directory)
351
        if input_file_grp is not None:
352
            deprecation_warning("Passing an input_file_grp kwarg other than 'None' to Processor "
353
                                "is deprecated - pass as argument to process_workspace instead")
354
            self.input_file_grp = input_file_grp
355
        if output_file_grp is not None:
356
            deprecation_warning("Passing an output_file_grp kwarg other than 'None' to Processor "
357
                                "is deprecated - pass as argument to process_workspace instead")
358
            self.output_file_grp = output_file_grp
359
        if page_id is not None:
360
            deprecation_warning("Passing a page_id kwarg other than 'None' to Processor "
361
                                "is deprecated - pass as argument to process_workspace instead")
362
            self.page_id = page_id or None
363
        self.download = download_files
364
        #: The logger to be used by processor implementations.
365
        # `ocrd.processor.base` internals should use :py:attr:`self._base_logger`
366
        self.logger = getLogger(f'ocrd.processor.{self.__class__.__name__}')
367
        self._base_logger = getLogger('ocrd.processor.base')
368
        if parameter is not None:
369
            self.parameter = parameter
370
        # workaround for deprecated#72 (@deprecated decorator does not work for subclasses):
371
        setattr(self, 'process',
372
                deprecated(version='3.0', reason='process() should be replaced with process_page_pcgts() or process_page_file() or process_workspace()')(getattr(self, 'process')))
373
374
    def __del__(self):
375
        self._base_logger.debug("shutting down")
376
        self.shutdown()
377
378
    def show_help(self, subcommand=None):
379
        """
380
        Print a usage description including the standard CLI and all of this processor's ocrd-tool
381
        parameters and docstrings.
382
        """
383
        print(generate_processor_help(self.ocrd_tool, processor_instance=self, subcommand=subcommand))
384
385
    def show_version(self):
386
        """
387
        Print information on this processor's version and OCR-D version.
388
        """
389
        print("Version %s, ocrd/core %s" % (self.version, OCRD_VERSION))
390
391
    def verify(self):
392
        """
393
        Verify that :py:attr:`input_file_grp` and :py:attr:`output_file_grp` fulfill the processor's requirements.
394
        """
395
        # verify input and output file groups in parameters
396
        assert self.input_file_grp is not None
397
        assert self.output_file_grp is not None
398
        input_file_grps = self.input_file_grp.split(',')
399
        output_file_grps = self.output_file_grp.split(',')
400
        def assert_file_grp_cardinality(grps : List[str], spec : Union[int, List[int]], msg):
401
            if isinstance(spec, int):
402
                if spec > 0:
403
                    assert len(grps) == spec, msg % (len(grps), str(spec))
404
            else:
405
                assert isinstance(spec, list)
406
                minimum = spec[0]
407
                maximum = spec[1]
408
                if minimum > 0:
409
                    assert len(grps) >= minimum, msg % (len(grps), str(spec))
410
                if maximum > 0:
411
                    assert len(grps) <= maximum, msg % (len(grps), str(spec))
412
        # FIXME: enforce unconditionally as soon as grace period for deprecation is over
413
        if 'input_file_grp_cardinality' in self.ocrd_tool:
414
            assert_file_grp_cardinality(input_file_grps, self.ocrd_tool['input_file_grp_cardinality'],
415
                                        "Unexpected number of input file groups %d vs %s")
416
        if 'output_file_grp_cardinality' in self.ocrd_tool:
417
            assert_file_grp_cardinality(output_file_grps, self.ocrd_tool['output_file_grp_cardinality'],
418
                                        "Unexpected number of output file groups %d vs %s")
419
        # verify input and output file groups in METS
420
        for input_file_grp in input_file_grps:
421
            assert input_file_grp in self.workspace.mets.file_groups, \
422
                f"input fileGrp {input_file_grp} does not exist in workspace {self.workspace}"
423
        for output_file_grp in output_file_grps:
424
            assert output_file_grp not in self.workspace.mets.file_groups \
425
                or config.OCRD_EXISTING_OUTPUT in ['OVERWRITE', 'SKIP'] \
426
                or not any(self.workspace.mets.find_files(
427
                    pageId=self.page_id, fileGrp=output_file_grp)), \
428
                    f"output fileGrp {output_file_grp} already exists in workspace {self.workspace}"
429
        # keep this for backwards compatibility:
430
        return True
431
432
    def dump_json(self):
433
        """
434
        Print :py:attr:`ocrd_tool` on stdout.
435
        """
436
        print(json.dumps(self.ocrd_tool, indent=True))
437
438
    def dump_module_dir(self):
439
        """
440
        Print :py:attr:`moduledir` on stdout.
441
        """
442
        print(self.moduledir)
443
444
    def list_resources(self):
445
        """
446
        Find all installed resource files in the search paths and print their path names.
447
        """
448
        for res in self.list_all_resources():
449
            print(res)
450
451
    def setup(self) -> None:
452
        """
453
        Prepare the processor for actual data processing,
454
        prior to changing to the workspace directory but
455
        after parsing parameters.
456
457
        (Override this to load models into memory etc.)
458
        """
459
        pass
460
461
    def shutdown(self) -> None:
462
        """
463
        Bring down the processor after data processing,
464
        after to changing back from the workspace directory but
465
        before exiting (or setting up with different parameters).
466
467
        (Override this to unload models from memory etc.)
468
        """
469
        pass
470
471
    @deprecated(version='3.0', reason='process() should be replaced with process_page_pcgts() or process_page_file() or process_workspace()')
472
    def process(self) -> None:
473
        """
474
        Process all files of the :py:data:`workspace`
475
        from the given :py:data:`input_file_grp`
476
        to the given :py:data:`output_file_grp`
477
        for the given :py:data:`page_id` (or all pages)
478
        under the given :py:data:`parameter`.
479
480
        (This contains the main functionality and needs to be
481
        overridden by subclasses.)
482
        """
483
        raise NotImplementedError()
484
485
    def process_workspace(self, workspace: Workspace) -> None:
486
        """
487
        Process all files of the given ``workspace``,
488
        from the given :py:data:`input_file_grp`
489
        to the given :py:data:`output_file_grp`
490
        for the given :py:data:`page_id` (or all pages)
491
        under the given :py:data:`parameter`.
492
493
        Delegates to :py:meth:`.process_workspace_submit_tasks`
494
        and :py:meth:`.process_workspace_handle_tasks`.
495
496
        (This will iterate over pages and files, calling
497
        :py:meth:`.process_page_file` and handling exceptions.
498
        It should be overridden by subclasses to handle cases
499
        like post-processing or computation across pages.)
500
        """
501
        with pushd_popd(workspace.directory):
502
            self.workspace = workspace
503
            self.verify()
504
            try:
505
                # set up multitasking
506
                max_workers = max(0, config.OCRD_MAX_PARALLEL_PAGES)
507
                if self.max_workers > 0 and self.max_workers < config.OCRD_MAX_PARALLEL_PAGES:
508
                    self._base_logger.info("limiting number of threads from %d to %d", max_workers, self.max_workers)
509
                    max_workers = self.max_workers
510
                if max_workers > 1:
511
                    assert isinstance(workspace.mets, ClientSideOcrdMets), \
512
                        "OCRD_MAX_PARALLEL_PAGES>1 requires also using --mets-server-url"
513
                max_seconds = max(0, config.OCRD_PROCESSING_PAGE_TIMEOUT)
514
                if self.max_page_seconds > 0 and self.max_page_seconds < config.OCRD_PROCESSING_PAGE_TIMEOUT:
515
                    self._base_logger.info("limiting page timeout from %d to %d sec", max_seconds, self.max_page_seconds)
516
                    max_seconds = self.max_page_seconds
517
518
                if max_workers > 1:
519
                    executor_cls = ProcessPoolExecutor
520
                    log_queue = mp.Queue()
521
                    # forward messages from log queue (in subprocesses) to all root handlers
522
                    log_listener = logging.handlers.QueueListener(log_queue, *logging.root.handlers, respect_handler_level=True)
523
                else:
524
                    executor_cls = DummyExecutor
525
                    log_queue = None
526
                    log_listener = None
527
                executor = executor_cls(
528
                    max_workers=max_workers or 1,
529
                    # only forking method avoids pickling
530
                    context=mp.get_context('fork'),
531
                    # share processor instance as global to avoid pickling
532
                    initializer=_page_worker_set_ctxt,
533
                    initargs=(self, log_queue),
534
                )
535
                if max_workers > 1:
536
                    log_listener.start()
537
                try:
538
                    self._base_logger.debug("started executor %s with %d workers", str(executor), max_workers or 1)
539
                    tasks = self.process_workspace_submit_tasks(executor, max_seconds)
540
                    stats = self.process_workspace_handle_tasks(tasks)
541
                finally:
542
                    executor.shutdown(kill_workers=True, wait=False)
543
                    if max_workers > 1:
544
                        log_listener.stop()
545
546
            except NotImplementedError:
547
                # fall back to deprecated method
548
                try:
549
                    self.process()
550
                except Exception as err:
551
                    # suppress the NotImplementedError context
552
                    raise err from None
553
554
    def process_workspace_submit_tasks(self, executor : TExecutor, max_seconds : int) -> Dict[TFuture, Tuple[str, List[Optional[OcrdFileType]]]]:
555
        """
556
        Look up all input files of the given ``workspace``
557
        from the given :py:data:`input_file_grp`
558
        for the given :py:data:`page_id` (or all pages),
559
        and schedules calling :py:meth:`.process_page_file`
560
        on them for each page via `executor` (enforcing
561
        a per-page time limit of `max_seconds`).
562
563
        When running with `OCRD_MAX_PARALLEL_PAGES>1` and
564
        the workspace via METS Server, the executor will fork
565
        this many worker parallel subprocesses each processing
566
        one page at a time. (Interprocess communication is
567
        done via task and result queues.)
568
569
        Otherwise, tasks are run sequentially in the
570
        current process.
571
572
        Delegates to :py:meth:`.zip_input_files` to get 
573
        the input files for each page, and then calls
574
        :py:meth:`.process_workspace_submit_page_task`.
575
576
        Returns a dict mapping the per-page tasks
577
        (i.e. futures submitted to the executor)
578
        to their corresponding pageId and input files.
579
        """
580
        tasks = {}
581
        for input_file_tuple in self.zip_input_files(on_error='abort', require_first=False):
582
            task, page_id, input_files = self.process_workspace_submit_page_task(executor, max_seconds, input_file_tuple)
583
            tasks[task] = (page_id, input_files)
584
        self._base_logger.debug("submitted %d processing tasks", len(tasks))
585
        return tasks
586
587
    def process_workspace_submit_page_task(self, executor : TExecutor, max_seconds : int, input_file_tuple : List[Optional[OcrdFileType]]) -> Tuple[TFuture, str, List[Optional[OcrdFileType]]]:
588
        """
589
        Ensure all input files for a single page are
590
        downloaded to the workspace, then schedule
591
        :py:meth:`.process_process_file` to be run on
592
        them via `executor` (enforcing a per-page time
593
        limit of `max_seconds`).
594
595
        Delegates to :py:meth:`.process_page_file`
596
        (wrapped in :py:func:`_page_worker` to share
597
        the processor instance across forked processes).
598
599
        \b
600
        Returns a tuple of:
601
        - the scheduled future object,
602
        - the corresponding pageId,
603
        - the corresponding input files.
604
        """
605
        input_files : List[Optional[OcrdFileType]] = [None] * len(input_file_tuple)
606
        page_id = next(input_file.pageId
607
                       for input_file in input_file_tuple
608
                       if input_file)
609
        self._base_logger.info(f"preparing page {page_id}")
610
        for i, input_file in enumerate(input_file_tuple):
611
            if input_file is None:
612
                # file/page not found in this file grp
613
                continue
614
            input_files[i] = input_file
615
            if not self.download:
616
                continue
617
            try:
618
                input_files[i] = self.workspace.download_file(input_file)
619
            except (ValueError, FileNotFoundError, HTTPError) as e:
620
                self._base_logger.error(repr(e))
621
                self._base_logger.warning(f"failed downloading file {input_file} for page {page_id}")
622
        # process page
623
        #executor.submit(self.process_page_file, *input_files)
624
        return executor.submit(_page_worker, max_seconds, *input_files), page_id, input_files
625
626
    def process_workspace_handle_tasks(self, tasks : Dict[TFuture, Tuple[str, List[Optional[OcrdFileType]]]]) -> Tuple[int, int, Dict[str, int], int]:
627
        """
628
        Look up scheduled per-page futures one by one,
629
        handle errors (exceptions) and gather results.
630
631
        \b
632
        Enforces policies configured by the following
633
        environment variables:
634
        - `OCRD_EXISTING_OUTPUT` (abort/skip/overwrite)
635
        - `OCRD_MISSING_OUTPUT` (abort/skip/fallback-copy)
636
        - `OCRD_MAX_MISSING_OUTPUTS` (abort after all).
637
638
        \b
639
        Returns a tuple of:
640
        - the number of successfully processed pages
641
        - the number of failed (i.e. skipped or copied) pages
642
        - a dict of the type and corresponding number of exceptions seen
643
        - the number of total requested pages (i.e. success+fail+existing).
644
645
        Delegates to :py:meth:`.process_workspace_handle_page_task`
646
        for each page.
647
        """
648
        # aggregate info for logging:
649
        nr_succeeded = 0
650
        nr_failed = 0
651
        nr_errors = defaultdict(int) # count causes
652
        if config.OCRD_MISSING_OUTPUT == 'SKIP':
653
            reason = "skipped"
654
        elif config.OCRD_MISSING_OUTPUT == 'COPY':
655
            reason = "fallback-copied"
656
        for task in tasks:
657
            # wait for results, handle errors
658
            page_id, input_files = tasks[task]
659
            result = self.process_workspace_handle_page_task(page_id, input_files, task)
660
            if isinstance(result, Exception):
661
                nr_errors[result.__class__.__name__] += 1
662
                nr_failed += 1
663
                # FIXME: this is just prospective, because len(tasks)==nr_failed+nr_succeeded is not guaranteed
664
                if config.OCRD_MAX_MISSING_OUTPUTS > 0 and nr_failed / len(tasks) > config.OCRD_MAX_MISSING_OUTPUTS:
665
                    # already irredeemably many failures, stop short
666
                    nr_errors = dict(nr_errors)
667
                    raise Exception(f"too many failures with {reason} output ({nr_failed} of {nr_failed+nr_succeeded}, {str(nr_errors)})")
668
            elif result:
669
                nr_succeeded += 1
670
            # else skipped - already exists
671
        nr_errors = dict(nr_errors)
672
        if nr_failed > 0:
673
            nr_all = nr_succeeded + nr_failed
674
            if config.OCRD_MAX_MISSING_OUTPUTS > 0 and nr_failed / nr_all > config.OCRD_MAX_MISSING_OUTPUTS:
675
                raise Exception(f"too many failures with {reason} output ({nr_failed} of {nr_all}, {str(nr_errors)})")
676
            self._base_logger.warning("%s %d of %d pages due to %s", reason, nr_failed, nr_all, str(nr_errors))
0 ignored issues
show
introduced by
The variable reason does not seem to be defined for all execution paths.
Loading history...
677
        return nr_succeeded, nr_failed, nr_errors, len(tasks)
678
679
    def process_workspace_handle_page_task(self, page_id : str, input_files : List[Optional[OcrdFileType]], task : TFuture) -> Union[bool, Exception]:
680
        """
681
        \b
682
        Await a single page result and handle errors (exceptions), 
683
        enforcing policies configured by the following
684
        environment variables:
685
        - `OCRD_EXISTING_OUTPUT` (abort/skip/overwrite)
686
        - `OCRD_MISSING_OUTPUT` (abort/skip/fallback-copy)
687
        - `OCRD_MAX_MISSING_OUTPUTS` (abort after all).
688
689
        \b
690
        Returns
691
        - true in case of success
692
        - false in case the output already exists
693
        - the exception in case of failure
694
        """
695
        # FIXME: differentiate error cases in various ways:
696
        # - ResourceNotFoundError → use ResourceManager to download (once), then retry
697
        # - transient (I/O or OOM) error → maybe sleep, retry
698
        # - persistent (data) error → skip / dummy / raise
699
        try:
700
            self._base_logger.debug("waiting for output of task %s (page %s)", task, page_id)
701
            # timeout kwarg on future is useless: it only raises TimeoutError here,
702
            # but does not stop the running process/thread, and executor itself
703
            # offers nothing to that effect:
704
            # task.result(timeout=max_seconds or None)
705
            # so we instead applied the timeout within the worker function
706
            task.result()
707
            return True
708
        except NotImplementedError:
709
            # exclude NotImplementedError, so we can try process() below
710
            raise
711
        # handle input failures separately
712
        except FileExistsError as err:
713
            if config.OCRD_EXISTING_OUTPUT == 'ABORT':
714
                raise err
715
            if config.OCRD_EXISTING_OUTPUT == 'SKIP':
716
                return False
717
            if config.OCRD_EXISTING_OUTPUT == 'OVERWRITE':
718
                # too late here, must not happen
719
                raise Exception(f"got {err} despite OCRD_EXISTING_OUTPUT==OVERWRITE")
720
        except KeyboardInterrupt:
721
            raise
722
        # broad coverage of output failures (including TimeoutError)
723
        except Exception as err:
724
            # FIXME: add re-usable/actionable logging
725
            if config.OCRD_MISSING_OUTPUT == 'ABORT':
726
                self._base_logger.error(f"Failure on page {page_id}: {str(err) or err.__class__.__name__}")
727
                raise err
728
            self._base_logger.exception(f"Failure on page {page_id}: {str(err) or err.__class__.__name__}")
729
            if config.OCRD_MISSING_OUTPUT == 'SKIP':
730
                pass
731
            elif config.OCRD_MISSING_OUTPUT == 'COPY':
732
                self._copy_page_file(input_files[0])
733
            else:
734
                desc = config.describe('OCRD_MISSING_OUTPUT', wrap_text=False, indent_text=False)
735
                raise ValueError(f"unknown configuration value {config.OCRD_MISSING_OUTPUT} - {desc}")
736
            return err
737
738
    def _copy_page_file(self, input_file : OcrdFileType) -> None:
739
        """
740
        Copy the given ``input_file`` of the :py:data:`workspace`,
741
        representing one physical page (passed as one opened
742
        :py:class:`~ocrd_models.OcrdFile` per input fileGrp)
743
        and add it as if it was a processing result.
744
        """
745
        input_pcgts : OcrdPage
746
        assert isinstance(input_file, get_args(OcrdFileType))
747
        self._base_logger.debug(f"parsing file {input_file.ID} for page {input_file.pageId}")
748
        try:
749
            input_pcgts = page_from_file(input_file)
750
        except ValueError as err:
751
            # not PAGE and not an image to generate PAGE for
752
            self._base_logger.error(f"non-PAGE input for page {input_file.pageId}: {err}")
753
            return
754
        output_file_id = make_file_id(input_file, self.output_file_grp)
755
        input_pcgts.set_pcGtsId(output_file_id)
756
        self.add_metadata(input_pcgts)
757
        self.workspace.add_file(
758
            file_id=output_file_id,
759
            file_grp=self.output_file_grp,
760
            page_id=input_file.pageId,
761
            local_filename=os.path.join(self.output_file_grp, output_file_id + '.xml'),
762
            mimetype=MIMETYPE_PAGE,
763
            content=to_xml(input_pcgts),
764
        )
765
766
    def process_page_file(self, *input_files : Optional[OcrdFileType]) -> None:
767
        """
768
        Process the given ``input_files`` of the :py:data:`workspace`,
769
        representing one physical page (passed as one opened
770
        :py:class:`.OcrdFile` per input fileGrp)
771
        under the given :py:data:`.parameter`, and make sure the
772
        results get added accordingly.
773
774
        (This uses :py:meth:`.process_page_pcgts`, but should be overridden by subclasses
775
        to handle cases like multiple output fileGrps, non-PAGE input etc.)
776
        """
777
        input_pcgts : List[Optional[OcrdPage]] = [None] * len(input_files)
778
        assert isinstance(input_files[0], get_args(OcrdFileType))
779
        page_id = input_files[0].pageId
780
        self._base_logger.info("processing page %s", page_id)
781
        for i, input_file in enumerate(input_files):
782
            assert isinstance(input_file, get_args(OcrdFileType))
783
            self._base_logger.debug(f"parsing file {input_file.ID} for page {page_id}")
784
            try:
785
                page_ = page_from_file(input_file)
786
                assert isinstance(page_, OcrdPage)
787
                input_pcgts[i] = page_
788
            except ValueError as err:
789
                # not PAGE and not an image to generate PAGE for
790
                self._base_logger.error(f"non-PAGE input for page {page_id}: {err}")
791
        output_file_id = make_file_id(input_files[0], self.output_file_grp)
792
        output_file = next(self.workspace.mets.find_files(ID=output_file_id), None)
793
        if output_file and config.OCRD_EXISTING_OUTPUT != 'OVERWRITE':
794
            # short-cut avoiding useless computation:
795
            raise FileExistsError(
796
                f"A file with ID=={output_file_id} already exists {output_file} and neither force nor ignore are set"
797
            )
798
        result = self.process_page_pcgts(*input_pcgts, page_id=page_id)
799
        for image_result in result.images:
800
            image_file_id = f'{output_file_id}_{image_result.file_id_suffix}'
801
            image_file_path = join(self.output_file_grp, f'{image_file_id}.png')
802
            if isinstance(image_result.alternative_image, PageType):
803
                # special case: not an alternative image, but replacing the original image
804
                # (this is needed by certain processors when the original's coordinate system
805
                #  cannot or must not be kept)
806
                image_result.alternative_image.set_imageFilename(image_file_path)
807
                image_result.alternative_image.set_imageWidth(image_result.pil.width)
808
                image_result.alternative_image.set_imageHeight(image_result.pil.height)
809
            elif isinstance(image_result.alternative_image, AlternativeImageType):
810
                image_result.alternative_image.set_filename(image_file_path)
811
            elif image_result.alternative_image is None:
812
                pass # do not reference in PAGE result
813
            else:
814
                raise ValueError(f"process_page_pcgts returned an OcrdPageResultImage of unknown type "
815
                                 f"{type(image_result.alternative_image)}")
816
            self.workspace.save_image_file(
817
                image_result.pil,
818
                image_file_id,
819
                self.output_file_grp,
820
                page_id=page_id,
821
                file_path=image_file_path,
822
            )
823
        result.pcgts.set_pcGtsId(output_file_id)
824
        self.add_metadata(result.pcgts)
825
        self.workspace.add_file(
826
            file_id=output_file_id,
827
            file_grp=self.output_file_grp,
828
            page_id=page_id,
829
            local_filename=os.path.join(self.output_file_grp, output_file_id + '.xml'),
830
            mimetype=MIMETYPE_PAGE,
831
            content=to_xml(result.pcgts),
832
        )
833
834
    def process_page_pcgts(self, *input_pcgts : Optional[OcrdPage], page_id : Optional[str] = None) -> OcrdPageResult:
835
        """
836
        Process the given ``input_pcgts`` of the :py:data:`.workspace`,
837
        representing one physical page (passed as one parsed
838
        :py:class:`.OcrdPage` per input fileGrp)
839
        under the given :py:data:`.parameter`, and return the
840
        resulting :py:class:`.OcrdPageResult`.
841
842
        Optionally, add to the ``images`` attribute of the resulting
843
        :py:class:`.OcrdPageResult` instances of :py:class:`.OcrdPageResultImage`,
844
        which have required fields for ``pil`` (:py:class:`PIL.Image` image data),
845
        ``file_id_suffix`` (used for generating IDs of the saved image) and
846
        ``alternative_image`` (reference of the :py:class:`ocrd_models.ocrd_page.AlternativeImageType`
847
        for setting the filename of the saved image).
848
849
        (This contains the main functionality and must be overridden by subclasses,
850
        unless it does not get called by some overriden :py:meth:`.process_page_file`.)
851
        """
852
        raise NotImplementedError()
853
854
    def add_metadata(self, pcgts: OcrdPage) -> None:
855
        """
856
        Add PAGE-XML :py:class:`~ocrd_models.ocrd_page.MetadataItemType` ``MetadataItem`` describing
857
        the processing step and runtime parameters to :py:class:`.OcrdPage` ``pcgts``.
858
        """
859
        metadata_obj = pcgts.get_Metadata()
860
        assert metadata_obj is not None
861
        metadata_obj.add_MetadataItem(
862
                MetadataItemType(type_="processingStep",
863
                    name=self.ocrd_tool['steps'][0],
864
                    value=self.ocrd_tool['executable'],
865
                    Labels=[LabelsType(
866
                        externalModel="ocrd-tool",
867
                        externalId="parameters",
868
                        Label=[LabelType(type_=name,
869
                                         value=self.parameter[name])
870
                               for name in self.parameter.keys()]),
871
                            LabelsType(
872
                        externalModel="ocrd-tool",
873
                        externalId="version",
874
                        Label=[LabelType(type_=self.ocrd_tool['executable'],
875
                                         value=self.version),
876
                               LabelType(type_='ocrd/core',
877
                                         value=OCRD_VERSION)])
878
                    ]))
879
880
    def resolve_resource(self, val):
881
        """
882
        Resolve a resource name to an absolute file path with the algorithm in
883
        `spec <https://ocr-d.de/en/spec/ocrd_tool#file-parameters>`_
884
885
        Args:
886
            val (string): resource value to resolve
887
        """
888
        executable = self.ocrd_tool['executable']
889
        if exists(val):
890
            self._base_logger.debug("Resolved to absolute path %s" % val)
891
            return val
892
        # FIXME: remove once workspace arg / old_pwd is gone:
893
        if hasattr(self, 'old_pwd'):
894
            cwd = self.old_pwd
895
        else:
896
            cwd = getcwd()
897
        ret = [cand for cand in list_resource_candidates(executable, val,
898
                                                         cwd=cwd, moduled=self.moduledir)
899
               if exists(cand)]
900
        if ret:
901
            self._base_logger.debug("Resolved %s to absolute path %s" % (val, ret[0]))
902
            return ret[0]
903
        raise ResourceNotFoundError(val, executable)
904
905
    def show_resource(self, val):
906
        """
907
        Resolve a resource name to a file path with the algorithm in
908
        `spec <https://ocr-d.de/en/spec/ocrd_tool#file-parameters>`_,
909
        then print its contents to stdout.
910
911
        Args:
912
            val (string): resource value to show
913
        """
914
        res_fname = self.resolve_resource(val)
915
        fpath = Path(res_fname)
916
        if fpath.is_dir():
917
            with pushd_popd(fpath):
918
                fileobj = io.BytesIO()
919
                with tarfile.open(fileobj=fileobj, mode='w:gz') as tarball:
920
                    tarball.add('.')
921
                fileobj.seek(0)
922
                copyfileobj(fileobj, sys.stdout.buffer)
923
        else:
924
            sys.stdout.buffer.write(fpath.read_bytes())
925
926
    def list_all_resources(self):
927
        """
928
        List all resources found in the filesystem and matching content-type by filename suffix
929
        """
930
        mimetypes = get_processor_resource_types(None, self.ocrd_tool)
931
        for res in list_all_resources(self.ocrd_tool['executable'], moduled=self.moduledir):
932
            res = Path(res)
933
            if not '*/*' in mimetypes:
934
                if res.is_dir() and not 'text/directory' in mimetypes:
935
                    continue
936
                # if we do not know all MIME types, then keep the file, otherwise require suffix match
937
                if res.is_file() and not any(res.suffix == MIME_TO_EXT.get(mime, res.suffix)
938
                                             for mime in mimetypes):
939
                    continue
940
            yield res
941
942
    @property
943
    def module(self):
944
        """
945
        The top-level module this processor belongs to.
946
        """
947
        # find shortest prefix path that is not just a namespace package
948
        fqname = ''
949
        for name in self.__module__.split('.'):
950
            if fqname:
951
                fqname += '.'
952
            fqname += name
953
            if getattr(sys.modules[fqname], '__file__', None):
954
                return fqname
955
        # fall-back
956
        return self.__module__
957
958
    @property
959
    def moduledir(self):
960
        """
961
        The filesystem path of the module directory.
962
        """
963
        return resource_filename(self.module, '.')
964
965
    @property
966
    def input_files(self):
967
        """
968
        List the input files (for single-valued :py:attr:`input_file_grp`).
969
970
        For each physical page:
971
972
        - If there is a single PAGE-XML for the page, take it (and forget about all
973
          other files for that page)
974
        - Else if there is a single image file, take it (and forget about all other
975
          files for that page)
976
        - Otherwise raise an error (complaining that only PAGE-XML warrants
977
          having multiple images for a single page)
978
979
        See `algorithm <https://github.com/cisocrgroup/ocrd_cis/pull/57#issuecomment-656336593>`_
980
981
        Returns:
982
            A list of :py:class:`ocrd_models.ocrd_file.OcrdFile` objects.
983
        """
984
        if not self.input_file_grp:
985
            raise ValueError("Processor is missing input fileGrp")
986
        ret = self.zip_input_files(mimetype=None, on_error='abort')
987
        if not ret:
988
            return []
989
        assert len(ret[0]) == 1, 'Use zip_input_files() instead of input_files when processing multiple input fileGrps'
990
        return [tuples[0] for tuples in ret]
991
992
    def zip_input_files(self, require_first=True, mimetype=None, on_error='skip'):
993
        """
994
        List tuples of input files (for multi-valued :py:attr:`input_file_grp`).
995
996
        Processors that expect/need multiple input file groups,
997
        cannot use :py:data:`input_files`. They must align (zip) input files
998
        across pages. This includes the case where not all pages
999
        are equally present in all file groups. It also requires
1000
        making a consistent selection if there are multiple files
1001
        per page.
1002
1003
        Following the OCR-D functional model, this function tries to
1004
        find a single PAGE file per page, or fall back to a single
1005
        image file per page. In either case, multiple matches per page
1006
        are an error (see error handling below).
1007
        This default behaviour can be changed by using a fixed MIME
1008
        type filter via :py:attr:`mimetype`. But still, multiple matching
1009
        files per page are an error.
1010
1011
        Single-page multiple-file errors are handled according to
1012
        :py:attr:`on_error`:
1013
1014
        - if ``skip``, then the page for the respective fileGrp will be
1015
          silently skipped (as if there was no match at all)
1016
        - if ``first``, then the first matching file for the page will be
1017
          silently selected (as if the first was the only match)
1018
        - if ``last``, then the last matching file for the page will be
1019
          silently selected (as if the last was the only match)
1020
        - if ``abort``, then an exception will be raised.
1021
1022
        Multiple matches for PAGE-XML will always raise an exception.
1023
1024
        Keyword Args:
1025
             require_first (boolean): If true, then skip a page entirely
1026
                 whenever it is not available in the first input `fileGrp`.
1027
             on_error (string): How to handle multiple file matches per page.
1028
             mimetype (string): If not `None`, filter by the specified MIME
1029
                 type (literal or regex prefixed by `//`). Otherwise prefer
1030
                 PAGE or image.
1031
        Returns:
1032
            A list of :py:class:`ocrd_models.ocrd_file.OcrdFile` tuples.
1033
        """
1034
        if not self.input_file_grp:
1035
            raise ValueError("Processor is missing input fileGrp")
1036
1037
        ifgs = self.input_file_grp.split(",")
1038
        # Iterating over all files repeatedly may seem inefficient at first sight,
1039
        # but the unnecessary OcrdFile instantiations for posterior fileGrp filtering
1040
        # can actually be much more costly than traversing the ltree.
1041
        # This might depend on the number of pages vs number of fileGrps.
1042
1043
        pages = {}
1044
        for i, ifg in enumerate(ifgs):
1045
            files_ = sorted(self.workspace.mets.find_all_files(
1046
                    pageId=self.page_id, fileGrp=ifg, mimetype=mimetype),
1047
                                # sort by MIME type so PAGE comes before images
1048
                                key=lambda file_: file_.mimetype)
1049
            for file_ in files_:
1050
                if not file_.pageId:
1051
                    # ignore document-global files
1052
                    continue
1053
                ift = pages.setdefault(file_.pageId, [None]*len(ifgs))
1054
                if ift[i]:
1055
                    self._base_logger.debug(f"another file {file_.ID} for page {file_.pageId} in input file group {ifg}")
1056
                    # fileGrp has multiple files for this page ID
1057
                    if mimetype:
1058
                        # filter was active, this must not happen
1059
                        self._base_logger.warning(f"added file {file_.ID} for page {file_.pageId} in input file group {ifg} "
1060
                                                  f"conflicts with file {ift[i].ID} of same MIME type {mimetype} - on_error={on_error}")
1061 View Code Duplication
                        if on_error == 'skip':
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1062
                            ift[i] = None
1063
                        elif on_error == 'first':
1064
                            pass # keep first match
1065
                        elif on_error == 'last':
1066
                            ift[i] = file_
1067
                        elif on_error == 'abort':
1068
                            raise NonUniqueInputFile(ifg, file_.pageId, mimetype)
1069
                        else:
1070
                            raise Exception("Unknown 'on_error' strategy '%s'" % on_error)
1071
                    elif (ift[i].mimetype == MIMETYPE_PAGE and
1072
                          file_.mimetype != MIMETYPE_PAGE):
1073
                        pass # keep PAGE match
1074
                    elif (ift[i].mimetype == MIMETYPE_PAGE and
1075
                          file_.mimetype == MIMETYPE_PAGE):
1076
                        raise NonUniqueInputFile(ifg, file_.pageId, None)
1077
                    else:
1078
                        # filter was inactive but no PAGE is in control, this must not happen
1079
                        self._base_logger.warning(f"added file {file_.ID} for page {file_.pageId} in input file group {ifg} "
1080
                                                  f"conflicts with file {ift[i].ID} but no PAGE available - on_error={on_error}")
1081 View Code Duplication
                        if on_error == 'skip':
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1082
                            ift[i] = None
1083
                        elif on_error == 'first':
1084
                            pass # keep first match
1085
                        elif on_error == 'last':
1086
                            ift[i] = file_
1087
                        elif on_error == 'abort':
1088
                            raise NonUniqueInputFile(ifg, file_.pageId, None)
1089
                        else:
1090
                            raise Exception("Unknown 'on_error' strategy '%s'" % on_error)
1091
                else:
1092
                    self._base_logger.debug(f"adding file {file_.ID} for page {file_.pageId} to input file group {ifg}")
1093
                    ift[i] = file_
1094
        # Warn if no files found but pageId was specified, because that might be due to invalid page_id (range)
1095
        if self.page_id and not any(pages):
1096
            self._base_logger.critical(f"Could not find any files for selected pageId {self.page_id}.\n"
1097
                                       f"compare '{self.page_id}' with the output of 'orcd workspace list-page'.")
1098
        ifts = []
1099
        for page, ifiles in pages.items():
1100
            for i, ifg in enumerate(ifgs):
1101
                if not ifiles[i]:
1102
                    # could be from non-unique with on_error=skip or from true gap
1103
                    self._base_logger.error(f'Found no file for page {page} in file group {ifg}')
1104
                    if config.OCRD_MISSING_INPUT == 'abort':
1105
                        raise MissingInputFile(ifg, page, mimetype)
1106
            if not any(ifiles):
1107
                # must be from non-unique with on_error=skip
1108
                self._base_logger.warning(f'Found no files for {page} - skipping')
1109
                continue
1110
            if ifiles[0] or not require_first:
1111
                ifts.append(tuple(ifiles))
1112
        return ifts
1113
1114
_page_worker_processor = None
1115
"""
1116
This global binding for the processor is required to avoid
1117
squeezing the processor through a mp.Queue (which is impossible
1118
due to unpicklable attributes like .workspace.mets._tree anyway)
1119
when calling Processor.process_page_file as page worker processes
1120
in Processor.process_workspace. Forking allows inheriting global
1121
objects, and with the METS Server we do not mutate the local
1122
processor instance anyway.
1123
"""
1124
def _page_worker_set_ctxt(processor, log_queue):
1125
    """
1126
    Overwrites `ocrd.processor.base._page_worker_processor` instance
1127
    for sharing with subprocesses in ProcessPoolExecutor initializer.
1128
    """
1129
    global _page_worker_processor
1130
    _page_worker_processor = processor
1131
    if log_queue:
1132
        # replace all log handlers with just one queue handler
1133
        logging.root.handlers = [logging.handlers.QueueHandler(log_queue)]
1134
1135
def _page_worker(timeout, *input_files):
1136
    """
1137
    Wraps a `Processor.process_page_file` call as payload (call target)
1138
    of the ProcessPoolExecutor workers, but also enforces the given timeout.
1139
    """
1140
    page_id = next((file.pageId for file in input_files
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable file does not seem to be defined.
Loading history...
1141
                    if hasattr(file, 'pageId')), "")
1142
    if timeout > 0:
1143
        timer = Timer(timeout, interrupt_main)
1144
        timer.start()
1145
    try:
1146
        _page_worker_processor.process_page_file(*input_files)
1147
        _page_worker_processor.logger.debug("page worker completed for page %s", page_id)
1148
    except KeyboardInterrupt:
1149
        _page_worker_processor.logger.debug("page worker timed out for page %s", page_id)
1150
        raise TimeoutError()
1151
    finally:
1152
        if timeout > 0:
1153
            timer.cancel()
0 ignored issues
show
introduced by
The variable timer does not seem to be defined in case timeout > 0 on line 1142 is False. Are you sure this can never be the case?
Loading history...
1154
1155
def generate_processor_help(ocrd_tool, processor_instance=None, subcommand=None):
1156
    """Generate a string describing the full CLI of this processor including params.
1157
1158
    Args:
1159
         ocrd_tool (dict): this processor's ``tools`` section of the module's ``ocrd-tool.json``
1160
         processor_instance (object, optional): the processor implementation
1161
             (for adding any module/class/function docstrings)
1162
        subcommand (string): 'worker' or 'server'
1163
    """
1164
    doc_help = ''
1165
    if processor_instance:
1166
        module = inspect.getmodule(processor_instance)
1167
        if module and module.__doc__:
1168
            doc_help += '\n' + inspect.cleandoc(module.__doc__) + '\n'
1169
        if processor_instance.__doc__:
1170
            doc_help += '\n' + inspect.cleandoc(processor_instance.__doc__) + '\n'
1171
        # Try to find the most concrete docstring among the various methods that an implementation
1172
        # could overload, first serving.
1173
        # In doing so, compare with Processor to avoid a glitch in the way py>=3.5 inherits docstrings.
1174
        # (They are supposed to only repeat information inspect.getdoc, rather than inherit __doc__ itself.)
1175
        for method in ['process_page_pcgts', 'process_page_file', 'process_workspace', 'process']:
1176
            instance_method = getattr(processor_instance, method)
1177
            superclass_method = getattr(Processor, method)
1178
            if instance_method.__doc__ and instance_method.__doc__ != superclass_method.__doc__:
1179
                doc_help += '\n' + inspect.cleandoc(instance_method.__doc__) + '\n'
1180
                break
1181
        if doc_help:
1182
            doc_help = '\n\n' + wrap_text(doc_help, width=72,
1183
                                          initial_indent='  > ',
1184
                                          subsequent_indent='  > ',
1185
                                          preserve_paragraphs=True)
1186
    subcommands = '''\
1187
    worker      Start a processing worker rather than do local processing
1188
    server      Start a processor server rather than do local processing
1189
'''
1190
1191
    processing_worker_options = '''\
1192
  --queue                         The RabbitMQ server address in format
1193
                                  "amqp://{user}:{pass}@{host}:{port}/{vhost}"
1194
                                  [amqp://admin:admin@localhost:5672]
1195
  --database                      The MongoDB server address in format
1196
                                  "mongodb://{host}:{port}"
1197
                                  [mongodb://localhost:27018]
1198
  --log-filename                  Filename to redirect STDOUT/STDERR to,
1199
                                  if specified.
1200
'''
1201
1202
    processing_server_options = '''\
1203
  --address                       The Processor server address in format
1204
                                  "{host}:{port}"
1205
  --database                      The MongoDB server address in format
1206
                                  "mongodb://{host}:{port}"
1207
                                  [mongodb://localhost:27018]
1208
'''
1209
1210
    processing_options = '''\
1211
  -m, --mets URL-PATH             URL or file path of METS to process [./mets.xml]
1212
  -w, --working-dir PATH          Working directory of local workspace [dirname(URL-PATH)]
1213
  -I, --input-file-grp USE        File group(s) used as input
1214
  -O, --output-file-grp USE       File group(s) used as output
1215
  -g, --page-id ID                Physical page ID(s) to process instead of full document []
1216
  --overwrite                     Remove existing output pages/images
1217
                                  (with "--page-id", remove only those).
1218
                                  Short-hand for OCRD_EXISTING_OUTPUT=OVERWRITE
1219
  --debug                         Abort on any errors with full stack trace.
1220
                                  Short-hand for OCRD_MISSING_OUTPUT=ABORT
1221
  --profile                       Enable profiling
1222
  --profile-file PROF-PATH        Write cProfile stats to PROF-PATH. Implies "--profile"
1223
  -p, --parameter JSON-PATH       Parameters, either verbatim JSON string
1224
                                  or JSON file path
1225
  -P, --param-override KEY VAL    Override a single JSON object key-value pair,
1226
                                  taking precedence over --parameter
1227
  -U, --mets-server-url URL       URL of a METS Server for parallel incremental access to METS
1228
                                  If URL starts with http:// start an HTTP server there,
1229
                                  otherwise URL is a path to an on-demand-created unix socket
1230
  -l, --log-level [OFF|ERROR|WARN|INFO|DEBUG|TRACE]
1231
                                  Override log level globally [INFO]
1232
  --log-filename LOG-PATH         File to redirect stderr logging to (overriding ocrd_logging.conf).
1233
'''
1234
1235
    information_options = '''\
1236
  -C, --show-resource RESNAME     Dump the content of processor resource RESNAME
1237
  -L, --list-resources            List names of processor resources
1238
  -J, --dump-json                 Dump tool description as JSON
1239
  -D, --dump-module-dir           Show the 'module' resource location path for this processor
1240
  -h, --help                      Show this message
1241
  -V, --version                   Show version
1242
'''
1243
1244
    parameter_help = ''
1245
    if 'parameters' not in ocrd_tool or not ocrd_tool['parameters']:
1246
        parameter_help = '  NONE\n'
1247
    else:
1248
        def wrap(s):
1249
            return wrap_text(s, initial_indent=' '*3,
1250
                             subsequent_indent=' '*4,
1251
                             width=72, preserve_paragraphs=True)
1252
        for param_name, param in ocrd_tool['parameters'].items():
1253
            parameter_help += wrap('"%s" [%s%s]' % (
1254
                param_name,
1255
                param['type'],
1256
                ' - REQUIRED' if 'required' in param and param['required'] else
1257
                ' - %s' % json.dumps(param['default']) if 'default' in param else ''))
1258
            parameter_help += '\n ' + wrap(param['description'])
1259
            if 'enum' in param:
1260
                parameter_help += '\n ' + wrap('Possible values: %s' % json.dumps(param['enum']))
1261
            parameter_help += "\n"
1262
1263
    if not subcommand:
1264
        return f'''\
1265
Usage: {ocrd_tool['executable']} [worker|server] [OPTIONS]
1266
1267
  {ocrd_tool['description']}{doc_help}
1268
1269
Subcommands:
1270
{subcommands}
1271
Options for processing:
1272
{processing_options}
1273
Options for information:
1274
{information_options}
1275
Parameters:
1276
{parameter_help}
1277
'''
1278
    elif subcommand == 'worker':
1279
        return f'''\
1280
Usage: {ocrd_tool['executable']} worker [OPTIONS]
1281
1282
  Run {ocrd_tool['executable']} as a processing worker.
1283
1284
  {ocrd_tool['description']}{doc_help}
1285
1286
Options:
1287
{processing_worker_options}
1288
'''
1289
    elif subcommand == 'server':
1290
        return f'''\
1291
Usage: {ocrd_tool['executable']} server [OPTIONS]
1292
1293
  Run {ocrd_tool['executable']} as a processor sever.
1294
1295
  {ocrd_tool['description']}{doc_help}
1296
1297
Options:
1298
{processing_server_options}
1299
'''
1300
    else:
1301
        pass
1302