Passed
Pull Request — master (#1240)
by Konstantin
03:02
created

ocrd.processor.helpers.get_processor()   B

Complexity

Conditions 6

Size

Total Lines 34
Code Lines 26

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 26
dl 0
loc 34
rs 8.3226
c 0
b 0
f 0
cc 6
nop 7
1
"""
2
Helper methods for running and documenting processors
3
"""
4
from time import perf_counter, process_time
5
from functools import lru_cache
6
import json
7
import inspect
8
from subprocess import run
9
from typing import List, Optional
10
11
from click import wrap_text
12
from ocrd.workspace import Workspace
13
from ocrd_utils import freeze_args, getLogger, config, setOverrideLogLevel, getLevelName, sparkline
14
15
16
__all__ = [
17
    'generate_processor_help',
18
    'run_cli',
19
    'run_processor'
20
]
21
22
23
def _get_workspace(workspace=None, resolver=None, mets_url=None, working_dir=None, mets_server_url=None):
24
    if workspace is None:
25
        if resolver is None:
26
            raise Exception("Need to pass a resolver to create a workspace")
27
        if mets_url is None:
28
            raise Exception("Need to pass mets_url to create a workspace")
29
        workspace = resolver.workspace_from_url(mets_url, dst_dir=working_dir, mets_server_url=mets_server_url)
30
    return workspace
31
32
def run_processor(
33
        processorClass,
34
        mets_url=None,
35
        resolver=None,
36
        workspace=None,
37
        page_id=None,
38
        log_level=None,
39
        input_file_grp=None,
40
        output_file_grp=None,
41
        parameter=None,
42
        working_dir=None,
43
        mets_server_url=None,
44
        instance_caching=False
45
): # pylint: disable=too-many-locals
46
    """
47
    Instantiate a Pythonic processor, open a workspace, run the processor and save the workspace.
48
49
    If :py:attr:`workspace` is not none, reuse that. Otherwise, instantiate an
50
    :py:class:`~ocrd.Workspace` for :py:attr:`mets_url` (and :py:attr:`working_dir`)
51
    by using :py:meth:`ocrd.Resolver.workspace_from_url` (i.e. open or clone local workspace).
52
53
    Instantiate a Python object for :py:attr:`processorClass`, passing:
54
    - the workspace,
55
    - :py:attr:`page_id`
56
    - :py:attr:`input_file_grp`
57
    - :py:attr:`output_file_grp`
58
    - :py:attr:`parameter` (after applying any :py:attr:`parameter_override` settings)
59
60
    Warning: Avoid setting the `instance_caching` flag to True. It may have unexpected side effects.
61
    This flag is used for an experimental feature we would like to adopt in future.
62
63
    Run the processor on the workspace (creating output files in the filesystem).
64
65
    Finally, write back the workspace (updating the METS in the filesystem).
66
67
    Args:
68
        processorClass (object): Python class of the module processor.
69
    """
70
    if log_level:
71
        setOverrideLogLevel(log_level)
72
    workspace = _get_workspace(
73
        workspace,
74
        resolver,
75
        mets_url,
76
        working_dir,
77
        mets_server_url
78
    )
79
    log = getLogger('ocrd.processor.helpers.run_processor')
80
    log.debug("Running processor %s", processorClass)
81
82
    processor = get_processor(
83
        processorClass,
84
        parameter=parameter,
85
        workspace=None,
86
        page_id=page_id,
87
        input_file_grp=input_file_grp,
88
        output_file_grp=output_file_grp,
89
        instance_caching=instance_caching
90
    )
91
92
    ocrd_tool = processor.ocrd_tool
93
    name = '%s v%s' % (ocrd_tool['executable'], processor.version)
94
    otherrole = ocrd_tool['steps'][0]
95
    logProfile = getLogger('ocrd.process.profile')
96
    log.debug("Processor instance %s (%s doing %s)", processor, name, otherrole)
97
    t0_wall = perf_counter()
98
    t0_cpu = process_time()
99
    if any(x in config.OCRD_PROFILE for x in ['RSS', 'PSS']):
100
        backend = 'psutil_pss' if 'PSS' in config.OCRD_PROFILE else 'psutil'
101
        from memory_profiler import memory_usage # pylint: disable=import-outside-toplevel
102
        try:
103
            mem_usage = memory_usage(proc=(processor.process_workspace, [workspace], {}),
104
                                     # only run process once
105
                                     max_iterations=1,
106
                                     interval=.1, timeout=None, timestamps=True,
107
                                     # include sub-processes
108
                                     multiprocess=True, include_children=True,
109
                                     # get proportional set size instead of RSS
110
                                     backend=backend)
111
        except Exception as err:
112
            log.exception("Failure in processor '%s'" % ocrd_tool['executable'])
113
            raise err
114
        mem_usage_values = [mem for mem, _ in mem_usage]
115
        mem_output = 'memory consumption: '
116
        mem_output += sparkline(mem_usage_values)
117
        mem_output += ' max: %.2f MiB min: %.2f MiB' % (max(mem_usage_values), min(mem_usage_values))
118
        logProfile.info(mem_output)
119
    else:
120
        try:
121
            processor.process_workspace(workspace)
122
        except Exception as err:
123
            log.exception("Failure in processor '%s'" % ocrd_tool['executable'])
124
            raise err
125
126
    t1_wall = perf_counter() - t0_wall
127
    t1_cpu = process_time() - t0_cpu
128
    logProfile.info("Executing processor '%s' took %fs (wall) %fs (CPU)( [--input-file-grp='%s' --output-file-grp='%s' --parameter='%s' --page-id='%s']" % (
129
        ocrd_tool['executable'],
130
        t1_wall,
131
        t1_cpu,
132
        processor.input_file_grp or '',
133
        processor.output_file_grp or '',
134
        json.dumps(processor.parameter) or '',
135
        processor.page_id or ''
136
    ))
137
    workspace.mets.add_agent(
138
        name=name,
139
        _type='OTHER',
140
        othertype='SOFTWARE',
141
        role='OTHER',
142
        otherrole=otherrole,
143
        notes=[({'option': 'input-file-grp'}, processor.input_file_grp or ''),
144
               ({'option': 'output-file-grp'}, processor.output_file_grp or ''),
145
               ({'option': 'parameter'}, json.dumps(processor.parameter or '')),
146
               ({'option': 'page-id'}, processor.page_id or '')]
147
    )
148
    workspace.save_mets()
149
    return processor
150
151
152
def run_cli(
153
        executable,
154
        mets_url=None,
155
        resolver=None,
156
        workspace=None,
157
        page_id=None,
158
        overwrite=None,
159
        debug=None,
160
        log_level=None,
161
        log_filename=None,
162
        input_file_grp=None,
163
        output_file_grp=None,
164
        parameter=None,
165
        working_dir=None,
166
        mets_server_url=None,
167
):
168
    """
169
    Open a workspace and run a processor on the command line.
170
171
    If :py:attr:`workspace` is not none, reuse that. Otherwise, instantiate an
172
    :py:class:`~ocrd.Workspace` for :py:attr:`mets_url` (and :py:attr:`working_dir`)
173
    by using :py:meth:`ocrd.Resolver.workspace_from_url` (i.e. open or clone local workspace).
174
175
    Run the processor CLI :py:attr:`executable` on the workspace, passing:
176
    - the workspace,
177
    - :py:attr:`page_id`
178
    - :py:attr:`input_file_grp`
179
    - :py:attr:`output_file_grp`
180
    - :py:attr:`parameter` (after applying any :py:attr:`parameter_override` settings)
181
182
    (Will create output files and update the in the filesystem).
183
184
    Args:
185
        executable (string): Executable name of the module processor.
186
    """
187
    workspace = _get_workspace(workspace, resolver, mets_url, working_dir)
188
    args = [executable, '--working-dir', workspace.directory]
189
    args += ['--mets', mets_url]
190
    if log_level:
191
        args += ['--log-level', log_level if isinstance(log_level, str) else getLevelName(log_level)]
192
    if page_id:
193
        args += ['--page-id', page_id]
194
    if input_file_grp:
195
        args += ['--input-file-grp', input_file_grp]
196
    if output_file_grp:
197
        args += ['--output-file-grp', output_file_grp]
198
    if parameter:
199
        args += ['--parameter', parameter]
200
    if overwrite:
201
        args += ['--overwrite']
202
    if debug:
203
        args += ['--debug']
204
    if mets_server_url:
205
        args += ['--mets-server-url', mets_server_url]
206
    log = getLogger('ocrd.processor.helpers.run_cli')
207
    log.debug("Running subprocess '%s'", ' '.join(args))
208
    if not log_filename:
209
        result = run(args, check=False)
210
    else:
211
        with open(log_filename, 'a', encoding='utf-8') as file_desc:
212
            result = run(args, check=False, stdout=file_desc, stderr=file_desc)
213
    return result.returncode
214
215
216
def generate_processor_help(ocrd_tool, processor_instance=None, subcommand=None):
217
    """Generate a string describing the full CLI of this processor including params.
218
219
    Args:
220
         ocrd_tool (dict): this processor's ``tools`` section of the module's ``ocrd-tool.json``
221
         processor_instance (object, optional): the processor implementation
222
             (for adding any module/class/function docstrings)
223
        subcommand (string): 'worker' or 'server'
224
    """
225
    doc_help = ''
226
    if processor_instance:
227
        module = inspect.getmodule(processor_instance)
228
        if module and module.__doc__:
229
            doc_help += '\n' + inspect.cleandoc(module.__doc__) + '\n'
230
        if processor_instance.__doc__:
231
            doc_help += '\n' + inspect.cleandoc(processor_instance.__doc__) + '\n'
232
        if processor_instance.process_workspace.__doc__:
233
            doc_help += '\n' + inspect.cleandoc(processor_instance.process_workspace.__doc__) + '\n'
234
        if processor_instance.process.__doc__:
235
            doc_help += '\n' + inspect.cleandoc(processor_instance.process.__doc__) + '\n'
236
        if doc_help:
237
            doc_help = '\n\n' + wrap_text(doc_help, width=72,
238
                                          initial_indent='  > ',
239
                                          subsequent_indent='  > ',
240
                                          preserve_paragraphs=True)
241
    subcommands = '''\
242
    worker      Start a processing worker rather than do local processing
243
    server      Start a processor server rather than do local processing
244
'''
245
246
    processing_worker_options = '''\
247
  --queue                         The RabbitMQ server address in format
248
                                  "amqp://{user}:{pass}@{host}:{port}/{vhost}"
249
                                  [amqp://admin:admin@localhost:5672]
250
  --database                      The MongoDB server address in format
251
                                  "mongodb://{host}:{port}"
252
                                  [mongodb://localhost:27018]
253
  --log-filename                  Filename to redirect STDOUT/STDERR to,
254
                                  if specified.
255
'''
256
257
    processing_server_options = '''\
258
  --address                       The Processor server address in format
259
                                  "{host}:{port}"
260
  --database                      The MongoDB server address in format
261
                                  "mongodb://{host}:{port}"
262
                                  [mongodb://localhost:27018]
263
'''
264
265
    processing_options = '''\
266
  -m, --mets URL-PATH             URL or file path of METS to process [./mets.xml]
267
  -w, --working-dir PATH          Working directory of local workspace [dirname(URL-PATH)]
268
  -I, --input-file-grp USE        File group(s) used as input
269
  -O, --output-file-grp USE       File group(s) used as output
270
  -g, --page-id ID                Physical page ID(s) to process instead of full document []
271
  --overwrite                     Remove existing output pages/images
272
                                  (with "--page-id", remove only those).
273
                                  Short-hand for OCRD_EXISTING_OUTPUT=OVERWRITE
274
  --debug                         Abort on any errors with full stack trace.
275
                                  Short-hand for OCRD_MISSING_OUTPUT=ABORT
276
  --profile                       Enable profiling
277
  --profile-file PROF-PATH        Write cProfile stats to PROF-PATH. Implies "--profile"
278
  -p, --parameter JSON-PATH       Parameters, either verbatim JSON string
279
                                  or JSON file path
280
  -P, --param-override KEY VAL    Override a single JSON object key-value pair,
281
                                  taking precedence over --parameter
282
  -U, --mets-server-url URL       URL of a METS Server for parallel incremental access to METS
283
                                  If URL starts with http:// start an HTTP server there,
284
                                  otherwise URL is a path to an on-demand-created unix socket
285
  -l, --log-level [OFF|ERROR|WARN|INFO|DEBUG|TRACE]
286
                                  Override log level globally [INFO]
287
'''
288
289
    information_options = '''\
290
  -C, --show-resource RESNAME     Dump the content of processor resource RESNAME
291
  -L, --list-resources            List names of processor resources
292
  -J, --dump-json                 Dump tool description as JSON
293
  -D, --dump-module-dir           Show the 'module' resource location path for this processor
294
  -h, --help                      Show this message
295
  -V, --version                   Show version
296
'''
297
298
    parameter_help = ''
299
    if 'parameters' not in ocrd_tool or not ocrd_tool['parameters']:
300
        parameter_help = '  NONE\n'
301
    else:
302
        def wrap(s):
303
            return wrap_text(s, initial_indent=' '*3,
304
                             subsequent_indent=' '*4,
305
                             width=72, preserve_paragraphs=True)
306
        for param_name, param in ocrd_tool['parameters'].items():
307
            parameter_help += wrap('"%s" [%s%s]' % (
308
                param_name,
309
                param['type'],
310
                ' - REQUIRED' if 'required' in param and param['required'] else
311
                ' - %s' % json.dumps(param['default']) if 'default' in param else ''))
312
            parameter_help += '\n ' + wrap(param['description'])
313
            if 'enum' in param:
314
                parameter_help += '\n ' + wrap('Possible values: %s' % json.dumps(param['enum']))
315
            parameter_help += "\n"
316
317
    if not subcommand:
318
        return f'''\
319
Usage: {ocrd_tool['executable']} [worker|server] [OPTIONS]
320
321
  {ocrd_tool['description']}{doc_help}
322
323
Subcommands:
324
{subcommands}
325
Options for processing:
326
{processing_options}
327
Options for information:
328
{information_options}
329
Parameters:
330
{parameter_help}
331
'''
332
    elif subcommand == 'worker':
333
        return f'''\
334
Usage: {ocrd_tool['executable']} worker [OPTIONS]
335
336
  Run {ocrd_tool['executable']} as a processing worker.
337
338
  {ocrd_tool['description']}{doc_help}
339
340
Options:
341
{processing_worker_options}
342
'''
343
    elif subcommand == 'server':
344
        return f'''\
345
Usage: {ocrd_tool['executable']} server [OPTIONS]
346
347
  Run {ocrd_tool['executable']} as a processor sever.
348
349
  {ocrd_tool['description']}{doc_help}
350
351
Options:
352
{processing_server_options}
353
'''
354
    else:
355
        pass
356
357
358
# not decorated here but at runtime (on first use)
359
#@freeze_args
360
#@lru_cache(maxsize=config.OCRD_MAX_PROCESSOR_CACHE)
361
def get_cached_processor(parameter: dict, processor_class):
362
    """
363
    Call this function to get back an instance of a processor.
364
    The results are cached based on the parameters.
365
    Args:
366
        parameter (dict): a dictionary of parameters.
367
        processor_class: the concrete `:py:class:~ocrd.Processor` class.
368
    Returns:
369
        When the concrete class of the processor is unknown, `None` is returned.
370
        Otherwise, an instance of the `:py:class:~ocrd.Processor` is returned.
371
    """
372
    if processor_class:
373
        processor = processor_class(None, parameter=dict(parameter))
374
        return processor
375
    return None
376
377
def get_processor(
378
        processor_class,
379
        parameter: Optional[dict] = None,
380
        workspace: Workspace = None,
381
        page_id: str = None,
382
        input_file_grp: List[str] = None,
383
        output_file_grp: List[str] = None,
384
        instance_caching: bool = False,
385
):
386
    if processor_class:
387
        if parameter is None:
388
            parameter = {}
389
        if instance_caching:
390
            global get_cached_processor
391
            if not hasattr(get_cached_processor, '__wrapped__'):
392
                # first call: wrap
393
                if processor_class.max_instances < 0:
394
                    maxsize = config.OCRD_MAX_PROCESSOR_CACHE
395
                else:
396
                    maxsize = min(config.OCRD_MAX_PROCESSOR_CACHE, processor_class.max_instances)
397
                # wrapping in call cache
398
                # wrapping dict into frozendict (from https://github.com/OCR-D/core/pull/884)
399
                get_cached_processor = freeze_args(lru_cache(maxsize=maxsize)(get_cached_processor))
400
            processor = get_cached_processor(parameter, processor_class)
401
        else:
402
            # avoid passing workspace already (deprecated chdir behaviour)
403
            processor = processor_class(None, parameter=parameter)
404
        # set current processing parameters
405
        processor.workspace = workspace
406
        processor.page_id = page_id
407
        processor.input_file_grp = input_file_grp
408
        processor.output_file_grp = output_file_grp
409
        return processor
410
    raise ValueError("Processor class is not known")
411