Passed
Push — master ( 46bee6...69cfcc )
by Konstantin
55s queued 15s
created

ocrd.processor.helpers   B

Complexity

Total Complexity 43

Size/Duplication

Total Lines 410
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 43
eloc 229
dl 0
loc 410
rs 8.96
c 0
b 0
f 0

6 Functions

Rating   Name   Duplication   Size   Complexity  
A _get_workspace() 0 8 4
C run_processor() 0 129 6
C run_cli() 0 59 11
A get_cached_processor() 0 17 3
F generate_processor_help() 0 135 16
A get_processor() 0 28 3

How to fix   Complexity   

Complexity

Complex classes like ocrd.processor.helpers often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
Helper methods for running and documenting processors
3
"""
4
from os import chdir, getcwd
5
from time import perf_counter, process_time
6
from functools import lru_cache
7
import json
8
import inspect
9
from subprocess import run
10
from typing import List
11
12
from click import wrap_text
13
from ocrd.workspace import Workspace
14
from ocrd_utils import freeze_args, getLogger, config, setOverrideLogLevel, getLevelName
15
16
17
__all__ = [
18
    'generate_processor_help',
19
    'run_cli',
20
    'run_processor'
21
]
22
23
24
def _get_workspace(workspace=None, resolver=None, mets_url=None, working_dir=None, mets_server_url=None):
25
    if workspace is None:
26
        if resolver is None:
27
            raise Exception("Need to pass a resolver to create a workspace")
28
        if mets_url is None:
29
            raise Exception("Need to pass mets_url to create a workspace")
30
        workspace = resolver.workspace_from_url(mets_url, dst_dir=working_dir, mets_server_url=mets_server_url)
31
    return workspace
32
33
def run_processor(
34
        processorClass,
35
        mets_url=None,
36
        resolver=None,
37
        workspace=None,
38
        page_id=None,
39
        log_level=None,
40
        input_file_grp=None,
41
        output_file_grp=None,
42
        show_resource=None,
43
        list_resources=False,
44
        parameter=None,
45
        parameter_override=None,
46
        working_dir=None,
47
        mets_server_url=None,
48
        instance_caching=False
49
): # pylint: disable=too-many-locals
50
    """
51
    Instantiate a Pythonic processor, open a workspace, run the processor and save the workspace.
52
53
    If :py:attr:`workspace` is not none, reuse that. Otherwise, instantiate an
54
    :py:class:`~ocrd.Workspace` for :py:attr:`mets_url` (and :py:attr:`working_dir`)
55
    by using :py:meth:`ocrd.Resolver.workspace_from_url` (i.e. open or clone local workspace).
56
57
    Instantiate a Python object for :py:attr:`processorClass`, passing:
58
    - the workspace,
59
    - :py:attr:`page_id`
60
    - :py:attr:`input_file_grp`
61
    - :py:attr:`output_file_grp`
62
    - :py:attr:`parameter` (after applying any :py:attr:`parameter_override` settings)
63
64
    Warning: Avoid setting the `instance_caching` flag to True. It may have unexpected side effects.
65
    This flag is used for an experimental feature we would like to adopt in future.
66
67
    Run the processor on the workspace (creating output files in the filesystem).
68
69
    Finally, write back the workspace (updating the METS in the filesystem).
70
71
    Args:
72
        processorClass (object): Python class of the module processor.
73
    """
74
    if log_level:
75
        setOverrideLogLevel(log_level)
76
    workspace = _get_workspace(
77
        workspace,
78
        resolver,
79
        mets_url,
80
        working_dir,
81
        mets_server_url
82
    )
83
    log = getLogger('ocrd.processor.helpers.run_processor')
84
    log.debug("Running processor %s", processorClass)
85
86
    old_cwd = getcwd()
87
    processor = get_processor(
88
        processor_class=processorClass,
89
        parameter=parameter,
90
        workspace=None,
91
        page_id=page_id,
92
        input_file_grp=input_file_grp,
93
        output_file_grp=output_file_grp,
94
        instance_caching=instance_caching
95
    )
96
    processor.workspace = workspace
97
    chdir(processor.workspace.directory)
98
99
    ocrd_tool = processor.ocrd_tool
100
    name = '%s v%s' % (ocrd_tool['executable'], processor.version)
101
    otherrole = ocrd_tool['steps'][0]
102
    logProfile = getLogger('ocrd.process.profile')
103
    log.debug("Processor instance %s (%s doing %s)", processor, name, otherrole)
104
    t0_wall = perf_counter()
105
    t0_cpu = process_time()
106
    if any(x in config.OCRD_PROFILE for x in ['RSS', 'PSS']):
107
        backend = 'psutil_pss' if 'PSS' in config.OCRD_PROFILE else 'psutil'
108
        from memory_profiler import memory_usage
109
        from sparklines import sparklines
110
        try:
111
            mem_usage = memory_usage(proc=processor.process,
112
                                     # only run process once
113
                                     max_iterations=1,
114
                                     interval=.1, timeout=None, timestamps=True,
115
                                     # include sub-processes
116
                                     multiprocess=True, include_children=True,
117
                                     # get proportional set size instead of RSS
118
                                     backend=backend)
119
        except Exception as err:
120
            log.exception("Failure in processor '%s'" % ocrd_tool['executable'])
121
            raise err
122
        finally:
123
            chdir(old_cwd)
124
        mem_usage_values = [mem for mem, _ in mem_usage]
125
        mem_output = 'memory consumption: '
126
        mem_output += ''.join(sparklines(mem_usage_values))
127
        mem_output += ' max: %.2f MiB min: %.2f MiB' % (max(mem_usage_values), min(mem_usage_values))
128
        logProfile.info(mem_output)
129
    else:
130
        try:
131
            processor.process()
132
        except Exception as err:
133
            log.exception("Failure in processor '%s'" % ocrd_tool['executable'])
134
            raise err
135
        finally:
136
            chdir(old_cwd)
137
138
    t1_wall = perf_counter() - t0_wall
139
    t1_cpu = process_time() - t0_cpu
140
    logProfile.info("Executing processor '%s' took %fs (wall) %fs (CPU)( [--input-file-grp='%s' --output-file-grp='%s' --parameter='%s' --page-id='%s']" % (
141
        ocrd_tool['executable'],
142
        t1_wall,
143
        t1_cpu,
144
        processor.input_file_grp or '',
145
        processor.output_file_grp or '',
146
        json.dumps(processor.parameter) or '',
147
        processor.page_id or ''
148
    ))
149
    workspace.mets.add_agent(
150
        name=name,
151
        _type='OTHER',
152
        othertype='SOFTWARE',
153
        role='OTHER',
154
        otherrole=otherrole,
155
        notes=[({'option': 'input-file-grp'}, processor.input_file_grp or ''),
156
               ({'option': 'output-file-grp'}, processor.output_file_grp or ''),
157
               ({'option': 'parameter'}, json.dumps(processor.parameter or '')),
158
               ({'option': 'page-id'}, processor.page_id or '')]
159
    )
160
    workspace.save_mets()
161
    return processor
162
163
164
def run_cli(
165
        executable,
166
        mets_url=None,
167
        resolver=None,
168
        workspace=None,
169
        page_id=None,
170
        overwrite=None,
171
        log_level=None,
172
        log_filename=None,
173
        input_file_grp=None,
174
        output_file_grp=None,
175
        parameter=None,
176
        working_dir=None,
177
        mets_server_url=None,
178
):
179
    """
180
    Open a workspace and run a processor on the command line.
181
182
    If :py:attr:`workspace` is not none, reuse that. Otherwise, instantiate an
183
    :py:class:`~ocrd.Workspace` for :py:attr:`mets_url` (and :py:attr:`working_dir`)
184
    by using :py:meth:`ocrd.Resolver.workspace_from_url` (i.e. open or clone local workspace).
185
186
    Run the processor CLI :py:attr:`executable` on the workspace, passing:
187
    - the workspace,
188
    - :py:attr:`page_id`
189
    - :py:attr:`input_file_grp`
190
    - :py:attr:`output_file_grp`
191
    - :py:attr:`parameter` (after applying any :py:attr:`parameter_override` settings)
192
193
    (Will create output files and update the in the filesystem).
194
195
    Args:
196
        executable (string): Executable name of the module processor.
197
    """
198
    workspace = _get_workspace(workspace, resolver, mets_url, working_dir)
199
    args = [executable, '--working-dir', workspace.directory]
200
    args += ['--mets', mets_url]
201
    if log_level:
202
        args += ['--log-level', log_level if isinstance(log_level, str) else getLevelName(log_level)]
203
    if page_id:
204
        args += ['--page-id', page_id]
205
    if input_file_grp:
206
        args += ['--input-file-grp', input_file_grp]
207
    if output_file_grp:
208
        args += ['--output-file-grp', output_file_grp]
209
    if parameter:
210
        args += ['--parameter', parameter]
211
    if overwrite:
212
        args += ['--overwrite']
213
    if mets_server_url:
214
        args += ['--mets-server-url', mets_server_url]
215
    log = getLogger('ocrd.processor.helpers.run_cli')
216
    log.debug("Running subprocess '%s'", ' '.join(args))
217
    if not log_filename:
218
        result = run(args, check=False)
219
    else:
220
        with open(log_filename, 'a') as file_desc:
221
            result = run(args, check=False, stdout=file_desc, stderr=file_desc)
222
    return result.returncode
223
224
225
def generate_processor_help(ocrd_tool, processor_instance=None, subcommand=None):
226
    """Generate a string describing the full CLI of this processor including params.
227
228
    Args:
229
         ocrd_tool (dict): this processor's ``tools`` section of the module's ``ocrd-tool.json``
230
         processor_instance (object, optional): the processor implementation
231
             (for adding any module/class/function docstrings)
232
        subcommand (string): 'worker' or 'server'
233
    """
234
    doc_help = ''
235
    if processor_instance:
236
        module = inspect.getmodule(processor_instance)
237
        if module and module.__doc__:
238
            doc_help += '\n' + inspect.cleandoc(module.__doc__) + '\n'
239
        if processor_instance.__doc__:
240
            doc_help += '\n' + inspect.cleandoc(processor_instance.__doc__) + '\n'
241
        if processor_instance.process.__doc__:
242
            doc_help += '\n' + inspect.cleandoc(processor_instance.process.__doc__) + '\n'
243
        if doc_help:
244
            doc_help = '\n\n' + wrap_text(doc_help, width=72,
245
                                          initial_indent='  > ',
246
                                          subsequent_indent='  > ',
247
                                          preserve_paragraphs=True)
248
    subcommands = '''\
249
    worker      Start a processing worker rather than do local processing
250
    server      Start a processor server rather than do local processing
251
'''
252
253
    processing_worker_options = '''\
254
  --queue                         The RabbitMQ server address in format
255
                                  "amqp://{user}:{pass}@{host}:{port}/{vhost}"
256
                                  [amqp://admin:admin@localhost:5672]
257
  --database                      The MongoDB server address in format
258
                                  "mongodb://{host}:{port}"
259
                                  [mongodb://localhost:27018]
260
  --log-filename                  Filename to redirect STDOUT/STDERR to,
261
                                  if specified.
262
'''
263
264
    processing_server_options = '''\
265
  --address                       The Processor server address in format
266
                                  "{host}:{port}"
267
  --database                      The MongoDB server address in format
268
                                  "mongodb://{host}:{port}"
269
                                  [mongodb://localhost:27018]
270
'''
271
272
    processing_options = '''\
273
  -m, --mets URL-PATH             URL or file path of METS to process [./mets.xml]
274
  -w, --working-dir PATH          Working directory of local workspace [dirname(URL-PATH)]
275
  -I, --input-file-grp USE        File group(s) used as input
276
  -O, --output-file-grp USE       File group(s) used as output
277
  -g, --page-id ID                Physical page ID(s) to process instead of full document []
278
  --overwrite                     Remove existing output pages/images
279
                                  (with "--page-id", remove only those)
280
  --profile                       Enable profiling
281
  --profile-file PROF-PATH        Write cProfile stats to PROF-PATH. Implies "--profile"
282
  -p, --parameter JSON-PATH       Parameters, either verbatim JSON string
283
                                  or JSON file path
284
  -P, --param-override KEY VAL    Override a single JSON object key-value pair,
285
                                  taking precedence over --parameter
286
  -U, --mets-server-url URL       URL of a METS Server for parallel incremental access to METS
287
                                  If URL starts with http:// start an HTTP server there,
288
                                  otherwise URL is a path to an on-demand-created unix socket
289
  -l, --log-level [OFF|ERROR|WARN|INFO|DEBUG|TRACE]
290
                                  Override log level globally [INFO]
291
'''
292
293
    information_options = '''\
294
  -C, --show-resource RESNAME     Dump the content of processor resource RESNAME
295
  -L, --list-resources            List names of processor resources
296
  -J, --dump-json                 Dump tool description as JSON
297
  -D, --dump-module-dir           Show the 'module' resource location path for this processor
298
  -h, --help                      Show this message
299
  -V, --version                   Show version
300
'''
301
302
    parameter_help = ''
303
    if 'parameters' not in ocrd_tool or not ocrd_tool['parameters']:
304
        parameter_help = '  NONE\n'
305
    else:
306
        def wrap(s):
307
            return wrap_text(s, initial_indent=' '*3,
308
                             subsequent_indent=' '*4,
309
                             width=72, preserve_paragraphs=True)
310
        for param_name, param in ocrd_tool['parameters'].items():
311
            parameter_help += wrap('"%s" [%s%s]' % (
312
                param_name,
313
                param['type'],
314
                ' - REQUIRED' if 'required' in param and param['required'] else
315
                ' - %s' % json.dumps(param['default']) if 'default' in param else ''))
316
            parameter_help += '\n ' + wrap(param['description'])
317
            if 'enum' in param:
318
                parameter_help += '\n ' + wrap('Possible values: %s' % json.dumps(param['enum']))
319
            parameter_help += "\n"
320
321
    if not subcommand:
322
        return f'''\
323
Usage: {ocrd_tool['executable']} [worker|server] [OPTIONS]
324
325
  {ocrd_tool['description']}{doc_help}
326
327
Subcommands:
328
{subcommands}
329
Options for processing:
330
{processing_options}
331
Options for information:
332
{information_options}
333
Parameters:
334
{parameter_help}
335
'''
336
    elif subcommand == 'worker':
337
        return f'''\
338
Usage: {ocrd_tool['executable']} worker [OPTIONS]
339
340
  Run {ocrd_tool['executable']} as a processing worker.
341
342
  {ocrd_tool['description']}{doc_help}
343
344
Options:
345
{processing_worker_options}
346
'''
347
    elif subcommand == 'server':
348
        return f'''\
349
Usage: {ocrd_tool['executable']} server [OPTIONS]
350
351
  Run {ocrd_tool['executable']} as a processor sever.
352
353
  {ocrd_tool['description']}{doc_help}
354
355
Options:
356
{processing_server_options}
357
'''
358
    else:
359
        pass
360
361
362
# Taken from https://github.com/OCR-D/core/pull/884
363
@freeze_args
364
@lru_cache(maxsize=config.OCRD_MAX_PROCESSOR_CACHE)
365
def get_cached_processor(parameter: dict, processor_class):
366
    """
367
    Call this function to get back an instance of a processor.
368
    The results are cached based on the parameters.
369
    Args:
370
        parameter (dict): a dictionary of parameters.
371
        processor_class: the concrete `:py:class:~ocrd.Processor` class.
372
    Returns:
373
        When the concrete class of the processor is unknown, `None` is returned.
374
        Otherwise, an instance of the `:py:class:~ocrd.Processor` is returned.
375
    """
376
    if processor_class:
377
        dict_params = dict(parameter) if parameter else None
378
        return processor_class(workspace=None, parameter=dict_params)
379
    return None
380
381
382
def get_processor(
383
        processor_class,
384
        parameter: dict,
385
        workspace: Workspace = None,
386
        page_id: str = None,
387
        input_file_grp: List[str] = None,
388
        output_file_grp: List[str] = None,
389
        instance_caching: bool = False,
390
):
391
    if processor_class:
392
        if instance_caching:
393
            cached_processor = get_cached_processor(
394
                parameter=parameter,
395
                processor_class=processor_class
396
            )
397
            cached_processor.workspace = workspace
398
            cached_processor.page_id = page_id
399
            cached_processor.input_file_grp = input_file_grp
400
            cached_processor.output_file_grp = output_file_grp
401
            return cached_processor
402
        return processor_class(
403
            workspace=workspace,
404
            page_id=page_id,
405
            input_file_grp=input_file_grp,
406
            output_file_grp=output_file_grp,
407
            parameter=parameter
408
        )
409
    raise ValueError("Processor class is not known")
410