Passed
Push — master ( 71c0c1...5d69e4 )
by Konstantin
02:52
created

ocrd.processor.helpers.get_processor()   B

Complexity

Conditions 6

Size

Total Lines 35
Code Lines 27

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 27
dl 0
loc 35
rs 8.2986
c 0
b 0
f 0
cc 6
nop 7
1
"""
2
Helper methods for running and documenting processors
3
"""
4
from time import perf_counter, process_time
5
from os import times
6
from functools import lru_cache
7
import json
8
import inspect
9
from subprocess import run
10
from typing import List, Optional
11
12
from ..workspace import Workspace
13
from ocrd_utils import freeze_args, getLogger, config, setOverrideLogLevel, getLevelName, sparkline
14
15
16
__all__ = [
17
    'run_cli',
18
    'run_processor'
19
]
20
21
22
def _get_workspace(workspace=None, resolver=None, mets_url=None, working_dir=None, mets_server_url=None):
23
    if workspace is None:
24
        if resolver is None:
25
            raise Exception("Need to pass a resolver to create a workspace")
26
        if mets_url is None:
27
            raise Exception("Need to pass mets_url to create a workspace")
28
        workspace = resolver.workspace_from_url(mets_url, dst_dir=working_dir, mets_server_url=mets_server_url)
29
    return workspace
30
31
def run_processor(
32
        processorClass,
33
        mets_url=None,
34
        resolver=None,
35
        workspace=None,
36
        page_id=None,
37
        log_level=None,
38
        input_file_grp=None,
39
        output_file_grp=None,
40
        parameter=None,
41
        working_dir=None,
42
        mets_server_url=None,
43
        instance_caching=False
44
): # pylint: disable=too-many-locals
45
    """
46
    Instantiate a Pythonic processor, open a workspace, run the processor and save the workspace.
47
48
    If :py:attr:`workspace` is not none, reuse that. Otherwise, instantiate an
49
    :py:class:`~ocrd.Workspace` for :py:attr:`mets_url` (and :py:attr:`working_dir`)
50
    by using :py:meth:`ocrd.Resolver.workspace_from_url` (i.e. open or clone local workspace).
51
52
    Instantiate a Python object for :py:attr:`processorClass`, passing:
53
    - the workspace,
54
    - :py:attr:`page_id`
55
    - :py:attr:`input_file_grp`
56
    - :py:attr:`output_file_grp`
57
    - :py:attr:`parameter` (after applying any :py:attr:`parameter_override` settings)
58
59
    Warning: Avoid setting the `instance_caching` flag to True. It may have unexpected side effects.
60
    This flag is used for an experimental feature we would like to adopt in future.
61
62
    Run the processor on the workspace (creating output files in the filesystem).
63
64
    Finally, write back the workspace (updating the METS in the filesystem).
65
66
    Args:
67
        processorClass (object): Python class of the module processor.
68
    """
69
    if log_level:
70
        setOverrideLogLevel(log_level)
71
    workspace = _get_workspace(
72
        workspace,
73
        resolver,
74
        mets_url,
75
        working_dir,
76
        mets_server_url
77
    )
78
    log = getLogger('ocrd.processor.helpers.run_processor')
79
    log.debug("Running processor %s", processorClass)
80
81
    processor = get_processor(
82
        processorClass,
83
        parameter=parameter,
84
        workspace=None,
85
        page_id=page_id,
86
        input_file_grp=input_file_grp,
87
        output_file_grp=output_file_grp,
88
        instance_caching=instance_caching
89
    )
90
91
    ocrd_tool = processor.ocrd_tool
92
    name = '%s v%s' % (ocrd_tool['executable'], processor.version)
93
    otherrole = ocrd_tool.get('steps', [''])[0]
94
    logProfile = getLogger('ocrd.process.profile')
95
    log.debug("Processor instance %s (%s doing %s)", processor, name, otherrole)
96
    t0_wall = perf_counter()
97
    t0_cpu = process_time()
98
    t0_os = times()
99
    if any(x in config.OCRD_PROFILE for x in ['RSS', 'PSS']):
100
        backend = 'psutil_pss' if 'PSS' in config.OCRD_PROFILE else 'psutil'
101
        from memory_profiler import memory_usage # pylint: disable=import-outside-toplevel
102
        try:
103
            mem_usage = memory_usage(proc=(processor.process_workspace, [workspace], {}),
104
                                     # only run process once
105
                                     max_iterations=1,
106
                                     interval=.1, timeout=None, timestamps=True,
107
                                     # include sub-processes
108
                                     multiprocess=True, include_children=True,
109
                                     # get proportional set size instead of RSS
110
                                     backend=backend)
111
        except Exception as err:
112
            log.exception("Failure in processor '%s'" % ocrd_tool['executable'])
113
            raise err
114
        mem_usage_values = [mem for mem, _ in mem_usage]
115
        mem_output = 'memory consumption: '
116
        mem_output += sparkline(mem_usage_values)
117
        mem_output += ' max: %.2f MiB min: %.2f MiB' % (max(mem_usage_values), min(mem_usage_values))
118
        logProfile.info(mem_output)
119
    else:
120
        try:
121
            processor.process_workspace(workspace)
122
        except Exception as err:
123
            log.exception("Failure in processor '%s'" % ocrd_tool['executable'])
124
            raise err
125
126
    t1_wall = perf_counter() - t0_wall
127
    t1_cpu = process_time() - t0_cpu
128
    t1_os = times()
129
    # add CPU time from child processes (page worker etc)
130
    t1_cpu += t1_os.children_user - t0_os.children_user
131
    t1_cpu += t1_os.children_system - t0_os.children_system
132
    logProfile.info(
133
        "Executing processor '%s' took %fs (wall) %fs (CPU)( "
134
        "[--input-file-grp='%s' --output-file-grp='%s' --parameter='%s' --page-id='%s']",
135
        ocrd_tool['executable'],
136
        t1_wall,
137
        t1_cpu,
138
        processor.input_file_grp or '',
139
        processor.output_file_grp or '',
140
        json.dumps(processor.parameter) or '',
141
        processor.page_id or ''
142
    )
143
    workspace.mets.add_agent(
144
        name=name,
145
        _type='OTHER',
146
        othertype='SOFTWARE',
147
        role='OTHER',
148
        otherrole=otherrole,
149
        notes=[({'option': 'input-file-grp'}, processor.input_file_grp or ''),
150
               ({'option': 'output-file-grp'}, processor.output_file_grp or ''),
151
               ({'option': 'parameter'}, json.dumps(processor.parameter or '')),
152
               ({'option': 'page-id'}, processor.page_id or '')]
153
    )
154
    workspace.save_mets()
155
    return processor
156
157
158
def run_cli(
159
        executable,
160
        mets_url=None,
161
        resolver=None,
162
        workspace=None,
163
        page_id=None,
164
        overwrite=None,
165
        debug=None,
166
        log_level=None,
167
        log_filename=None,
168
        input_file_grp=None,
169
        output_file_grp=None,
170
        parameter=None,
171
        working_dir=None,
172
        mets_server_url=None,
173
):
174
    """
175
    Open a workspace and run a processor on the command line.
176
177
    If :py:attr:`workspace` is not none, reuse that. Otherwise, instantiate an
178
    :py:class:`~ocrd.Workspace` for :py:attr:`mets_url` (and :py:attr:`working_dir`)
179
    by using :py:meth:`ocrd.Resolver.workspace_from_url` (i.e. open or clone local workspace).
180
181
    Run the processor CLI :py:attr:`executable` on the workspace, passing:
182
    - the workspace,
183
    - :py:attr:`page_id`
184
    - :py:attr:`input_file_grp`
185
    - :py:attr:`output_file_grp`
186
    - :py:attr:`parameter` (after applying any :py:attr:`parameter_override` settings)
187
188
    (Will create output files and update the in the filesystem).
189
190
    Args:
191
        executable (string): Executable name of the module processor.
192
    """
193
    workspace = _get_workspace(workspace, resolver, mets_url, working_dir)
194
    args = [executable, '--working-dir', workspace.directory]
195
    args += ['--mets', mets_url]
196
    if log_level:
197
        args += ['--log-level', log_level if isinstance(log_level, str) else getLevelName(log_level)]
198
    if page_id:
199
        args += ['--page-id', page_id]
200
    if input_file_grp:
201
        args += ['--input-file-grp', input_file_grp]
202
    if output_file_grp:
203
        args += ['--output-file-grp', output_file_grp]
204
    if parameter:
205
        args += ['--parameter', parameter]
206
    if overwrite:
207
        args += ['--overwrite']
208
    if debug:
209
        args += ['--debug']
210
    if mets_server_url:
211
        args += ['--mets-server-url', mets_server_url]
212
    log = getLogger('ocrd.processor.helpers.run_cli')
213
    log.debug("Running subprocess '%s'", ' '.join(args))
214
    if not log_filename:
215
        result = run(args, check=False)
216
    else:
217
        with open(log_filename, 'a', encoding='utf-8') as file_desc:
218
            result = run(args, check=False, stdout=file_desc, stderr=file_desc)
219
    return result.returncode
220
221
222
223
# not decorated here but at runtime (on first use)
224
#@freeze_args
225
#@lru_cache(maxsize=config.OCRD_MAX_PROCESSOR_CACHE)
226
def get_cached_processor(parameter: dict, processor_class):
227
    """
228
    Call this function to get back an instance of a processor.
229
    The results are cached based on the parameters.
230
    Args:
231
        parameter (dict): a dictionary of parameters.
232
        processor_class: the concrete `:py:class:~ocrd.Processor` class.
233
    Returns:
234
        When the concrete class of the processor is unknown, `None` is returned.
235
        Otherwise, an instance of the `:py:class:~ocrd.Processor` is returned.
236
    """
237
    if processor_class:
238
        processor = processor_class(None, parameter=dict(parameter))
239
        return processor
240
    return None
241
242
def get_processor(
243
        processor_class,
244
        parameter: Optional[dict] = None,
245
        workspace: Optional[Workspace] = None,
246
        page_id: Optional[str] = None,
247
        input_file_grp: Optional[List[str]] = None,
248
        output_file_grp: Optional[List[str]] = None,
249
        instance_caching: bool = False,
250
):
251
    if processor_class:
252
        if parameter is None:
253
            parameter = {}
254
        if instance_caching:
255
            global get_cached_processor
256
            if not hasattr(get_cached_processor, '__wrapped__'):
257
                # first call: wrap
258
                if processor_class.max_instances < 0:
259
                    maxsize = config.OCRD_MAX_PROCESSOR_CACHE
260
                else:
261
                    maxsize = min(config.OCRD_MAX_PROCESSOR_CACHE, processor_class.max_instances)
262
                # wrapping in call cache
263
                # wrapping dict into frozendict (from https://github.com/OCR-D/core/pull/884)
264
                get_cached_processor = freeze_args(lru_cache(maxsize=maxsize)(get_cached_processor))
265
            processor = get_cached_processor(parameter, processor_class)
266
        else:
267
            # avoid passing workspace already (deprecated chdir behaviour)
268
            processor = processor_class(None, parameter=parameter)
269
        assert processor
270
        # set current processing parameters
271
        processor.workspace = workspace
272
        processor.page_id = page_id
273
        processor.input_file_grp = input_file_grp
274
        processor.output_file_grp = output_file_grp
275
        return processor
276
    raise ValueError("Processor class is not known")
277