Passed
Pull Request — master (#1240)
by Konstantin
03:20
created

ocrd.processor.helpers.get_processor()   B

Complexity

Conditions 6

Size

Total Lines 34
Code Lines 26

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 26
dl 0
loc 34
rs 8.3226
c 0
b 0
f 0
cc 6
nop 7
1
"""
2
Helper methods for running and documenting processors
3
"""
4
from time import perf_counter, process_time
5
from functools import lru_cache
6
import json
7
import inspect
8
from subprocess import run
9
from typing import List, Optional
10
11
from ..workspace import Workspace
12
from ocrd_utils import freeze_args, getLogger, config, setOverrideLogLevel, getLevelName, sparkline
13
14
15
__all__ = [
16
    'run_cli',
17
    'run_processor'
18
]
19
20
21
def _get_workspace(workspace=None, resolver=None, mets_url=None, working_dir=None, mets_server_url=None):
22
    if workspace is None:
23
        if resolver is None:
24
            raise Exception("Need to pass a resolver to create a workspace")
25
        if mets_url is None:
26
            raise Exception("Need to pass mets_url to create a workspace")
27
        workspace = resolver.workspace_from_url(mets_url, dst_dir=working_dir, mets_server_url=mets_server_url)
28
    return workspace
29
30
def run_processor(
31
        processorClass,
32
        mets_url=None,
33
        resolver=None,
34
        workspace=None,
35
        page_id=None,
36
        log_level=None,
37
        input_file_grp=None,
38
        output_file_grp=None,
39
        parameter=None,
40
        working_dir=None,
41
        mets_server_url=None,
42
        instance_caching=False
43
): # pylint: disable=too-many-locals
44
    """
45
    Instantiate a Pythonic processor, open a workspace, run the processor and save the workspace.
46
47
    If :py:attr:`workspace` is not none, reuse that. Otherwise, instantiate an
48
    :py:class:`~ocrd.Workspace` for :py:attr:`mets_url` (and :py:attr:`working_dir`)
49
    by using :py:meth:`ocrd.Resolver.workspace_from_url` (i.e. open or clone local workspace).
50
51
    Instantiate a Python object for :py:attr:`processorClass`, passing:
52
    - the workspace,
53
    - :py:attr:`page_id`
54
    - :py:attr:`input_file_grp`
55
    - :py:attr:`output_file_grp`
56
    - :py:attr:`parameter` (after applying any :py:attr:`parameter_override` settings)
57
58
    Warning: Avoid setting the `instance_caching` flag to True. It may have unexpected side effects.
59
    This flag is used for an experimental feature we would like to adopt in future.
60
61
    Run the processor on the workspace (creating output files in the filesystem).
62
63
    Finally, write back the workspace (updating the METS in the filesystem).
64
65
    Args:
66
        processorClass (object): Python class of the module processor.
67
    """
68
    if log_level:
69
        setOverrideLogLevel(log_level)
70
    workspace = _get_workspace(
71
        workspace,
72
        resolver,
73
        mets_url,
74
        working_dir,
75
        mets_server_url
76
    )
77
    log = getLogger('ocrd.processor.helpers.run_processor')
78
    log.debug("Running processor %s", processorClass)
79
80
    processor = get_processor(
81
        processorClass,
82
        parameter=parameter,
83
        workspace=None,
84
        page_id=page_id,
85
        input_file_grp=input_file_grp,
86
        output_file_grp=output_file_grp,
87
        instance_caching=instance_caching
88
    )
89
90
    ocrd_tool = processor.ocrd_tool
91
    name = '%s v%s' % (ocrd_tool['executable'], processor.version)
92
    otherrole = ocrd_tool['steps'][0]
93
    logProfile = getLogger('ocrd.process.profile')
94
    log.debug("Processor instance %s (%s doing %s)", processor, name, otherrole)
95
    t0_wall = perf_counter()
96
    t0_cpu = process_time()
97
    if any(x in config.OCRD_PROFILE for x in ['RSS', 'PSS']):
98
        backend = 'psutil_pss' if 'PSS' in config.OCRD_PROFILE else 'psutil'
99
        from memory_profiler import memory_usage # pylint: disable=import-outside-toplevel
100
        try:
101
            mem_usage = memory_usage(proc=(processor.process_workspace, [workspace], {}),
102
                                     # only run process once
103
                                     max_iterations=1,
104
                                     interval=.1, timeout=None, timestamps=True,
105
                                     # include sub-processes
106
                                     multiprocess=True, include_children=True,
107
                                     # get proportional set size instead of RSS
108
                                     backend=backend)
109
        except Exception as err:
110
            log.exception("Failure in processor '%s'" % ocrd_tool['executable'])
111
            raise err
112
        mem_usage_values = [mem for mem, _ in mem_usage]
113
        mem_output = 'memory consumption: '
114
        mem_output += sparkline(mem_usage_values)
115
        mem_output += ' max: %.2f MiB min: %.2f MiB' % (max(mem_usage_values), min(mem_usage_values))
116
        logProfile.info(mem_output)
117
    else:
118
        try:
119
            processor.process_workspace(workspace)
120
        except Exception as err:
121
            log.exception("Failure in processor '%s'" % ocrd_tool['executable'])
122
            raise err
123
124
    t1_wall = perf_counter() - t0_wall
125
    t1_cpu = process_time() - t0_cpu
126
    logProfile.info("Executing processor '%s' took %fs (wall) %fs (CPU)( [--input-file-grp='%s' --output-file-grp='%s' --parameter='%s' --page-id='%s']" % (
127
        ocrd_tool['executable'],
128
        t1_wall,
129
        t1_cpu,
130
        processor.input_file_grp or '',
131
        processor.output_file_grp or '',
132
        json.dumps(processor.parameter) or '',
133
        processor.page_id or ''
134
    ))
135
    workspace.mets.add_agent(
136
        name=name,
137
        _type='OTHER',
138
        othertype='SOFTWARE',
139
        role='OTHER',
140
        otherrole=otherrole,
141
        notes=[({'option': 'input-file-grp'}, processor.input_file_grp or ''),
142
               ({'option': 'output-file-grp'}, processor.output_file_grp or ''),
143
               ({'option': 'parameter'}, json.dumps(processor.parameter or '')),
144
               ({'option': 'page-id'}, processor.page_id or '')]
145
    )
146
    workspace.save_mets()
147
    return processor
148
149
150
def run_cli(
151
        executable,
152
        mets_url=None,
153
        resolver=None,
154
        workspace=None,
155
        page_id=None,
156
        overwrite=None,
157
        debug=None,
158
        log_level=None,
159
        log_filename=None,
160
        input_file_grp=None,
161
        output_file_grp=None,
162
        parameter=None,
163
        working_dir=None,
164
        mets_server_url=None,
165
):
166
    """
167
    Open a workspace and run a processor on the command line.
168
169
    If :py:attr:`workspace` is not none, reuse that. Otherwise, instantiate an
170
    :py:class:`~ocrd.Workspace` for :py:attr:`mets_url` (and :py:attr:`working_dir`)
171
    by using :py:meth:`ocrd.Resolver.workspace_from_url` (i.e. open or clone local workspace).
172
173
    Run the processor CLI :py:attr:`executable` on the workspace, passing:
174
    - the workspace,
175
    - :py:attr:`page_id`
176
    - :py:attr:`input_file_grp`
177
    - :py:attr:`output_file_grp`
178
    - :py:attr:`parameter` (after applying any :py:attr:`parameter_override` settings)
179
180
    (Will create output files and update the in the filesystem).
181
182
    Args:
183
        executable (string): Executable name of the module processor.
184
    """
185
    workspace = _get_workspace(workspace, resolver, mets_url, working_dir)
186
    args = [executable, '--working-dir', workspace.directory]
187
    args += ['--mets', mets_url]
188
    if log_level:
189
        args += ['--log-level', log_level if isinstance(log_level, str) else getLevelName(log_level)]
190
    if page_id:
191
        args += ['--page-id', page_id]
192
    if input_file_grp:
193
        args += ['--input-file-grp', input_file_grp]
194
    if output_file_grp:
195
        args += ['--output-file-grp', output_file_grp]
196
    if parameter:
197
        args += ['--parameter', parameter]
198
    if overwrite:
199
        args += ['--overwrite']
200
    if debug:
201
        args += ['--debug']
202
    if mets_server_url:
203
        args += ['--mets-server-url', mets_server_url]
204
    log = getLogger('ocrd.processor.helpers.run_cli')
205
    log.debug("Running subprocess '%s'", ' '.join(args))
206
    if not log_filename:
207
        result = run(args, check=False)
208
    else:
209
        with open(log_filename, 'a', encoding='utf-8') as file_desc:
210
            result = run(args, check=False, stdout=file_desc, stderr=file_desc)
211
    return result.returncode
212
213
214
215
# not decorated here but at runtime (on first use)
216
#@freeze_args
217
#@lru_cache(maxsize=config.OCRD_MAX_PROCESSOR_CACHE)
218
def get_cached_processor(parameter: dict, processor_class):
219
    """
220
    Call this function to get back an instance of a processor.
221
    The results are cached based on the parameters.
222
    Args:
223
        parameter (dict): a dictionary of parameters.
224
        processor_class: the concrete `:py:class:~ocrd.Processor` class.
225
    Returns:
226
        When the concrete class of the processor is unknown, `None` is returned.
227
        Otherwise, an instance of the `:py:class:~ocrd.Processor` is returned.
228
    """
229
    if processor_class:
230
        processor = processor_class(None, parameter=dict(parameter))
231
        return processor
232
    return None
233
234
def get_processor(
235
        processor_class,
236
        parameter: Optional[dict] = None,
237
        workspace: Workspace = None,
238
        page_id: str = None,
239
        input_file_grp: List[str] = None,
240
        output_file_grp: List[str] = None,
241
        instance_caching: bool = False,
242
):
243
    if processor_class:
244
        if parameter is None:
245
            parameter = {}
246
        if instance_caching:
247
            global get_cached_processor
248
            if not hasattr(get_cached_processor, '__wrapped__'):
249
                # first call: wrap
250
                if processor_class.max_instances < 0:
251
                    maxsize = config.OCRD_MAX_PROCESSOR_CACHE
252
                else:
253
                    maxsize = min(config.OCRD_MAX_PROCESSOR_CACHE, processor_class.max_instances)
254
                # wrapping in call cache
255
                # wrapping dict into frozendict (from https://github.com/OCR-D/core/pull/884)
256
                get_cached_processor = freeze_args(lru_cache(maxsize=maxsize)(get_cached_processor))
257
            processor = get_cached_processor(parameter, processor_class)
258
        else:
259
            # avoid passing workspace already (deprecated chdir behaviour)
260
            processor = processor_class(None, parameter=parameter)
261
        # set current processing parameters
262
        processor.workspace = workspace
263
        processor.page_id = page_id
264
        processor.input_file_grp = input_file_grp
265
        processor.output_file_grp = output_file_grp
266
        return processor
267
    raise ValueError("Processor class is not known")
268