Passed
Pull Request — master (#1338)
by
unknown
02:09
created

ocrd.decorators   A

Complexity

Total Complexity 35

Size/Duplication

Total Lines 184
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 35
eloc 154
dl 0
loc 184
rs 9.6
c 0
b 0
f 0

2 Functions

Rating   Name   Duplication   Size   Complexity  
A check_and_run_processing_worker() 0 23 3
F ocrd_cli_wrap_processor() 0 133 32
1
import sys
2
from contextlib import nullcontext
3
4
from ocrd_utils import (
5
    config,
6
    initLogging,
7
    is_local_filename,
8
    get_local_filename,
9
    getLogger,
10
    parse_json_string_with_comments,
11
    set_json_key_value_overrides,
12
    parse_json_string_or_file,
13
    redirect_stderr_and_stdout_to_file,
14
)
15
from ocrd_validators import WorkspaceValidator
16
17
from ..resolver import Resolver
18
from ..processor.base import ResourceNotFoundError, run_processor
19
20
from .loglevel_option import ocrd_loglevel
21
from .parameter_option import parameter_option, parameter_override_option
22
from .ocrd_cli_options import ocrd_cli_options
23
from .mets_find_options import mets_find_options
24
25
26
def ocrd_cli_wrap_processor(
27
    processorClass,
28
    mets=None,
29
    mets_server_url=None,
30
    working_dir=None,
31
    dump_json=False,
32
    dump_module_dir=False,
33
    help=False,  # pylint: disable=redefined-builtin
34
    profile=False,
35
    profile_file=None,
36
    version=False,
37
    overwrite=False,
38
    debug=False,
39
    resolve_resource=None,
40
    show_resource=None,
41
    list_resources=False,
42
    # ocrd_network params start #
43
    subcommand=None,
44
    queue=None,
45
    log_filename=None,
46
    database=None,
47
    # ocrd_network params end #
48
    **kwargs
49
):
50
    # init logging handlers so no imported libs can preempt ours
51
    initLogging()
52
53
    # FIXME: remove workspace arg entirely
54
    processor = processorClass(None)
55
    if not sys.argv[1:]:
56
        processor.show_help(subcommand=subcommand)
57
        sys.exit(1)
58
    if help:
59
        processor.show_help(subcommand=subcommand)
60
        sys.exit()
61
    if version:
62
        processor.show_version()
63
        sys.exit()
64
    if dump_json:
65
        processor.dump_json()
66
        sys.exit()
67
    if dump_module_dir:
68
        processor.dump_module_dir()
69
        sys.exit()
70
    if resolve_resource:
71
        try:
72
            res = processor.resolve_resource(resolve_resource)
73
            print(res)
74
            sys.exit()
75
        except ResourceNotFoundError as e:
76
            log = getLogger('ocrd.processor.base')
77
            log.critical(e.message)
78
            sys.exit(1)
79
    if show_resource:
80
        try:
81
            processor.show_resource(show_resource)
82
            sys.exit()
83
        except ResourceNotFoundError as e:
84
            log = getLogger('ocrd.processor.base')
85
            log.critical(e.message)
86
            sys.exit(1)
87
    if list_resources:
88
        processor.list_resources()
89
        sys.exit()
90
    if subcommand == "worker" or queue or database:
91
        check_and_run_processing_worker(processorClass, database, queue)
92
93
    if 'parameter' in kwargs:
94
        # Disambiguate parameter file/literal, and resolve file
95
        def resolve(name):
96
            try:
97
                return processor.resolve_resource(name)
98
            except ResourceNotFoundError:
99
                return None
100
        kwargs['parameter'] = parse_json_string_or_file(*kwargs['parameter'],
101
                                                        resolve_preset_file=resolve)
102
    else:
103
        kwargs['parameter'] = {}
104
    # Merge parameter overrides and parameters
105
    if 'parameter_override' in kwargs:
106
        set_json_key_value_overrides(kwargs['parameter'], *kwargs.pop('parameter_override'))
107
    # Assert -I / -O
108
    if not kwargs.get('input_file_grp', None):
109
        raise ValueError('-I/--input-file-grp is required')
110
    if 'output_file_grp' not in kwargs:
111
        raise ValueError('-O/--output-file-grp is required')  # actually, it may be None
112
    resolver = Resolver()
113
    working_dir, mets, _, mets_server_url = \
114
        resolver.resolve_mets_arguments(working_dir, mets, None, mets_server_url)
115
    workspace = resolver.workspace_from_url(mets, working_dir, mets_server_url=mets_server_url)
116
    page_id = kwargs.get('page_id')
117
    if debug:
118
        config.OCRD_MISSING_INPUT = 'ABORT'
119
        config.OCRD_MISSING_OUTPUT = 'ABORT'
120
        config.OCRD_EXISTING_OUTPUT = 'ABORT'
121
    if overwrite:
122
        config.OCRD_EXISTING_OUTPUT = 'OVERWRITE'
123
    report = WorkspaceValidator.check_file_grp(workspace,
124
                                               kwargs['input_file_grp'],
125
                                               '' if overwrite else kwargs['output_file_grp'],
126
                                               page_id)
127
    if not report.is_valid:
128
        raise Exception("Invalid input/output file grps:\n\t%s" % '\n\t'.join(report.errors))
129
    # Set up profiling behavior from environment variables/flags
130
    if not profile and 'CPU' in config.OCRD_PROFILE:
131
        profile = True
132
    if not profile_file and config.is_set('OCRD_PROFILE_FILE'):
133
        profile_file = config.OCRD_PROFILE_FILE
134
    if profile or profile_file:
135
        import cProfile
136
        import pstats
137
        import io
138
        import atexit
139
        print("Profiling...")
140
        pr = cProfile.Profile()
141
        pr.enable()
142
143
        def goexit():
144
            pr.disable()
0 ignored issues
show
introduced by
The variable pr does not seem to be defined in case profile or profile_file on line 134 is False. Are you sure this can never be the case?
Loading history...
145
            print("Profiling completed")
146
            if profile_file:
147
                pr.dump_stats(profile_file)
148
            s = io.StringIO()
0 ignored issues
show
introduced by
The variable io does not seem to be defined in case profile or profile_file on line 134 is False. Are you sure this can never be the case?
Loading history...
149
            pstats.Stats(pr, stream=s).sort_stats("cumulative").print_stats()
0 ignored issues
show
introduced by
The variable pstats does not seem to be defined in case profile or profile_file on line 134 is False. Are you sure this can never be the case?
Loading history...
150
            print(s.getvalue())
151
152
        atexit.register(goexit)
153
    if log_filename:
154
        log_ctx = redirect_stderr_and_stdout_to_file(log_filename)
155
    else:
156
        log_ctx = nullcontext()
157
    with log_ctx:
158
        run_processor(processorClass, mets_url=mets, workspace=workspace, **kwargs)
159
160
161
def check_and_run_processing_worker(ProcessorClass, database: str, queue: str):
162
    """ Check/start Processing Worker for the WebAPI architecture
163
    """
164
    from ocrd_network import ProcessingWorker
165
166
    if not database:
167
        raise ValueError("Option '--database' is required for the Processing Worker")
168
    if not queue:
169
        raise ValueError("Option '--queue' is required for the Processing Worker")
170
171
    processor = ProcessorClass(workspace=None)
172
    processing_worker = ProcessingWorker(
173
        rabbitmq_addr=queue,
174
        mongodb_addr=database,
175
        processor_name=processor.ocrd_tool['executable'],
176
        ocrd_tool=processor.ocrd_tool,
177
        processor_class=ProcessorClass,
178
    )
179
    # The RMQConsumer is initialized and a connection to the RabbitMQ is performed
180
    processing_worker.connect_consumer()
181
    # Start consuming from the queue with name `processor_name`
182
    processing_worker.start_consuming()
183
    sys.exit(0)
184