Passed
Pull Request — master (#1240)
by Konstantin
03:00
created

ocrd.decorators   B

Complexity

Total Complexity 43

Size/Duplication

Total Lines 201
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 43
eloc 169
dl 0
loc 201
rs 8.96
c 0
b 0
f 0

2 Functions

Rating   Name   Duplication   Size   Complexity  
C check_and_run_network_agent() 0 45 11
F ocrd_cli_wrap_processor() 0 127 32

How to fix   Complexity   

Complexity

Complex classes like ocrd.decorators often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import sys
2
3
from ocrd_utils import (
4
    config,
5
    initLogging,
6
    is_local_filename,
7
    get_local_filename,
8
    getLogger,
9
    parse_json_string_with_comments,
10
    set_json_key_value_overrides,
11
    parse_json_string_or_file,
12
)
13
from ocrd_validators import WorkspaceValidator
14
from ocrd_network import ProcessingWorker, ProcessorServer, AgentType
15
16
from ..resolver import Resolver
17
from ..processor.base import ResourceNotFoundError, run_processor
18
19
from .loglevel_option import ocrd_loglevel
20
from .parameter_option import parameter_option, parameter_override_option
21
from .ocrd_cli_options import ocrd_cli_options
22
from .mets_find_options import mets_find_options
23
24
SUBCOMMANDS = [AgentType.PROCESSING_WORKER, AgentType.PROCESSOR_SERVER]
25
26
27
def ocrd_cli_wrap_processor(
28
    processorClass,
29
    mets=None,
30
    mets_server_url=None,
31
    working_dir=None,
32
    dump_json=False,
33
    dump_module_dir=False,
34
    help=False, # pylint: disable=redefined-builtin
35
    profile=False,
36
    profile_file=None,
37
    version=False,
38
    overwrite=False,
39
    debug=False,
40
    resolve_resource=None,
41
    show_resource=None,
42
    list_resources=False,
43
    # ocrd_network params start #
44
    subcommand=None,
45
    address=None,
46
    queue=None,
47
    log_filename=None,
48
    database=None,
49
    # ocrd_network params end #
50
    **kwargs
51
):
52
    # FIXME: remove workspace arg entirely
53
    processor = processorClass(None)
54
    if not sys.argv[1:]:
55
        processor.show_help(subcommand=subcommand)
56
        sys.exit(1)
57
    if help:
58
        processor.show_help(subcommand=subcommand)
59
        sys.exit()
60
    if version:
61
        processor.show_version()
62
        sys.exit()
63
    if dump_json:
64
        processor.dump_json()
65
        sys.exit()
66
    if dump_module_dir:
67
        processor.dump_module_dir()
68
        sys.exit()
69
    if resolve_resource:
70
        try:
71
            res = processor.resolve_resource(resolve_resource)
72
            print(res)
73
            sys.exit()
74
        except ResourceNotFoundError as e:
75
            log = getLogger('ocrd.processor.base')
76
            log.critical(e.message)
77
            sys.exit(1)
78
    if show_resource:
79
        try:
80
            processor.show_resource(show_resource)
81
            sys.exit()
82
        except ResourceNotFoundError as e:
83
            log = getLogger('ocrd.processor.base')
84
            log.critical(e.message)
85
            sys.exit(1)
86
    if list_resources:
87
        processor.list_resources()
88
        sys.exit()
89
    if subcommand:
90
        # Used for checking/starting network agents for the WebAPI architecture
91
        check_and_run_network_agent(processorClass, subcommand, address, database, queue)
92
    elif address or queue or database:
93
        raise ValueError(f"Subcommand options --address --queue and --database are only valid for subcommands: {SUBCOMMANDS}")
94
95
    # from here: single-run processing context
96
    initLogging()
97
    if 'parameter' in kwargs:
98
        # Disambiguate parameter file/literal, and resolve file
99
        def resolve(name):
100
            try:
101
                return processor.resolve_resource(name)
102
            except ResourceNotFoundError:
103
                return None
104
        kwargs['parameter'] = parse_json_string_or_file(*kwargs['parameter'],
105
                                                        resolve_preset_file=resolve)
106
    else:
107
        kwargs['parameter'] = dict()
108
    # Merge parameter overrides and parameters
109
    if 'parameter_override' in kwargs:
110
        set_json_key_value_overrides(kwargs['parameter'], *kwargs['parameter_override'])
111
    # Assert -I / -O
112
    if not kwargs['input_file_grp']:
113
        raise ValueError('-I/--input-file-grp is required')
114
    if not kwargs['output_file_grp']:
115
        raise ValueError('-O/--output-file-grp is required')
116
    resolver = Resolver()
117
    working_dir, mets, _, mets_server_url = \
118
            resolver.resolve_mets_arguments(working_dir, mets, None, mets_server_url)
119
    workspace = resolver.workspace_from_url(mets, working_dir, mets_server_url=mets_server_url)
120
    page_id = kwargs.get('page_id')
121
    if debug:
122
        config.OCRD_MISSING_INPUT = 'ABORT'
123
        config.OCRD_MISSING_OUTPUT = 'ABORT'
124
        config.OCRD_EXISTING_OUTPUT = 'ABORT'
125
    if overwrite:
126
        config.OCRD_EXISTING_OUTPUT = 'OVERWRITE'
127
    report = WorkspaceValidator.check_file_grp(workspace, kwargs['input_file_grp'], '' if overwrite else kwargs['output_file_grp'], page_id)
128
    if not report.is_valid:
129
        raise Exception("Invalid input/output file grps:\n\t%s" % '\n\t'.join(report.errors))
130
    # Set up profiling behavior from environment variables/flags
131
    if not profile and 'CPU' in config.OCRD_PROFILE:
132
        profile = True
133
    if not profile_file and config.is_set('OCRD_PROFILE_FILE'):
134
        profile_file = config.OCRD_PROFILE_FILE
135
    if profile or profile_file:
136
        import cProfile
137
        import pstats
138
        import io
139
        import atexit
140
        print("Profiling...")
141
        pr = cProfile.Profile()
142
        pr.enable()
143
        def exit():
144
            pr.disable()
0 ignored issues
show
introduced by
The variable pr does not seem to be defined in case profile or profile_file on line 135 is False. Are you sure this can never be the case?
Loading history...
145
            print("Profiling completed")
146
            if profile_file:
147
                with open(profile_file, 'wb') as f:
148
                    pr.dump_stats(profile_file)
149
            s = io.StringIO()
0 ignored issues
show
introduced by
The variable io does not seem to be defined in case profile or profile_file on line 135 is False. Are you sure this can never be the case?
Loading history...
150
            pstats.Stats(pr, stream=s).sort_stats("cumulative").print_stats()
0 ignored issues
show
introduced by
The variable pstats does not seem to be defined in case profile or profile_file on line 135 is False. Are you sure this can never be the case?
Loading history...
151
            print(s.getvalue())
152
        atexit.register(exit)
153
    run_processor(processorClass, mets_url=mets, workspace=workspace, **kwargs)
154
155
156
def check_and_run_network_agent(ProcessorClass, subcommand: str, address: str, database: str, queue: str):
157
    """
158
    """
159
    if subcommand not in SUBCOMMANDS:
160
        raise ValueError(f"SUBCOMMAND can only be one of {SUBCOMMANDS}")
161
162
    if not database:
163
        raise ValueError(f"Option '--database' is invalid for subcommand {subcommand}")
164
165
    if subcommand == AgentType.PROCESSOR_SERVER:
166
        if not address:
167
            raise ValueError(f"Option '--address' required for subcommand {subcommand}")
168
        if queue:
169
            raise ValueError(f"Option '--queue' invalid for subcommand {subcommand}")
170
    if subcommand == AgentType.PROCESSING_WORKER:
171
        if address:
172
            raise ValueError(f"Option '--address' invalid for subcommand {subcommand}")
173
        if not queue:
174
            raise ValueError(f"Option '--queue' required for subcommand {subcommand}")
175
176
    processor = ProcessorClass(workspace=None)
177
    if subcommand == AgentType.PROCESSING_WORKER:
178
        processing_worker = ProcessingWorker(
179
            rabbitmq_addr=queue,
180
            mongodb_addr=database,
181
            processor_name=processor.ocrd_tool['executable'],
182
            ocrd_tool=processor.ocrd_tool,
183
            processor_class=ProcessorClass,
184
        )
185
        # The RMQConsumer is initialized and a connection to the RabbitMQ is performed
186
        processing_worker.connect_consumer()
187
        # Start consuming from the queue with name `processor_name`
188
        processing_worker.start_consuming()
189
    elif subcommand == AgentType.PROCESSOR_SERVER:
190
        # TODO: Better validate that inside the ProcessorServer itself
191
        host, port = address.split(':')
192
        processor_server = ProcessorServer(
193
            mongodb_addr=database,
194
            processor_name=processor.ocrd_tool['executable'],
195
            processor_class=ProcessorClass,
196
        )
197
        processor_server.run_server(host=host, port=int(port))
198
    else:
199
        raise ValueError(f"Unknown network agent type, must be one of: {SUBCOMMANDS}")
200
    sys.exit(0)
201