Passed
Push — master ( de8337...c2057a )
by Konstantin
47s queued 13s
created

ocrd.task_sequence.run_tasks()   B

Complexity

Conditions 5

Size

Total Lines 39
Code Lines 25

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 25
dl 0
loc 39
rs 8.8133
c 0
b 0
f 0
cc 5
nop 4
1
import json
2
from shlex import split as shlex_split
3
from distutils.spawn import find_executable as which # pylint: disable=import-error,no-name-in-module
4
from subprocess import run, PIPE
5
from collections import Counter
6
7
from ocrd_utils import getLogger, parse_json_string_or_file
8
from ocrd.processor.base import run_cli
9
from ocrd.resolver import Resolver
10
from ocrd_validators import ParameterValidator, WorkspaceValidator, ValidationReport
11
12
class ProcessorTask():
13
14
    @classmethod
15
    def parse(cls, argstr):
16
        tokens = shlex_split(argstr)
17
        executable = 'ocrd-%s' % tokens.pop(0)
18
        input_file_grps = []
19
        output_file_grps = []
20
        parameter_path = None
21
        while tokens:
22
            if tokens[0] == '-I':
23
                for grp in tokens[1].split(','):
24
                    input_file_grps.append(grp)
25
                tokens = tokens[2:]
26
            elif tokens[0] == '-O':
27
                for grp in tokens[1].split(','):
28
                    output_file_grps.append(grp)
29
                tokens = tokens[2:]
30
            elif tokens[0] == '-p':
31
                parameter_path = tokens[1]
32
                tokens = tokens[2:]
33
            else:
34
                raise Exception("Failed parsing task description '%s' with tokens remaining: '%s'" % (argstr, tokens))
35
        return ProcessorTask(executable, input_file_grps, output_file_grps, parameter_path)
36
37
    def __init__(self, executable, input_file_grps, output_file_grps, parameter_path=None):
38
        self.executable = executable
39
        self.input_file_grps = input_file_grps
40
        self.output_file_grps = output_file_grps
41
        self.parameter_path = parameter_path
42
        self._ocrd_tool_json = None
43
44
    @property
45
    def ocrd_tool_json(self):
46
        if self._ocrd_tool_json:
47
            return self._ocrd_tool_json
48
        result = run([self.executable, '--dump-json'], stdout=PIPE, check=True, universal_newlines=True)
49
        self._ocrd_tool_json = json.loads(result.stdout)
50
        return self._ocrd_tool_json
51
52
    def validate(self):
53
        if not which(self.executable):
54
            raise Exception("Executable not found in PATH: %s" % self.executable)
55
        if not self.input_file_grps:
56
            raise Exception("Task must have input file group")
57
        parameters = {}
58
        if self.parameter_path:
59
            parameters = parse_json_string_or_file(self.parameter_path)
60
        param_validator = ParameterValidator(self.ocrd_tool_json)
61
        report = param_validator.validate(parameters)
62
        if not report.is_valid:
63
            raise Exception(report.errors)
64
        if 'output_file_grp' in self.ocrd_tool_json and not self.output_file_grps:
65
            raise Exception("Processor requires output_file_grp but none was provided.")
66
        return True
67
68
    def __str__(self):
69
        ret = '%s -I %s -O %s' % (
70
            self.executable.replace('ocrd-', '', 1),
71
            ','.join(self.input_file_grps),
72
            ','.join(self.output_file_grps))
73
        if self.parameter_path:
74
            ret += ' -p %s' % self.parameter_path
75
        return ret
76
77
def validate_tasks(tasks, workspace):
78
    report = ValidationReport()
79
    prev_output_file_grps = workspace.mets.file_groups
80
81
    # first task: check input/output file groups from METS
82
    # TODO disable output_file_grps checks once CLI parameter 'overwrite' is implemented
83
    WorkspaceValidator.check_file_grp(workspace, tasks[0].input_file_grps, tasks[0].output_file_grps, report)
84
85
    prev_output_file_grps += tasks[0].input_file_grps
86
    for task in tasks[1:]:
87
        task.validate()
88
        # check either existing fileGrp or output-file group of previous task matches current input_file_group
89
        for input_file_grp in task.input_file_grps:
90
            if not input_file_grp in prev_output_file_grps:
91
                report.add_error("Input file group not contained in METS or produced by previous steps: %s" % input_file_grp)
92
        # TODO disable output_file_grps checks once CLI parameter 'overwrite' is implemented
93
        if len(prev_output_file_grps) != len(set(prev_output_file_grps)):
94
            report.add_error("Output file group specified multiple times: %s" % 
95
                [grp for grp, count in Counter(prev_output_file_grps).items() if count >= 2])
96
        prev_output_file_grps += task.output_file_grps
97
    if not report.is_valid:
98
        raise Exception("Invalid task sequence input/output file groups: %s" % report.errors)
99
100
101
def run_tasks(mets, log_level, page_id, task_strs):
102
    resolver = Resolver()
103
    workspace = resolver.workspace_from_url(mets)
104
    log = getLogger('ocrd.task_sequence.run_tasks')
105
    tasks = [ProcessorTask.parse(task_str) for task_str in task_strs]
106
107
    validate_tasks(tasks, workspace)
108
109
    # Run the tasks
110
    for task in tasks:
111
112
        log.info("Start processing task '%s'", task)
113
114
        # execute cli
115
        returncode = run_cli(
116
            task.executable,
117
            mets,
118
            resolver,
119
            workspace,
120
            log_level=log_level,
121
            page_id=page_id,
122
            input_file_grp=','.join(task.input_file_grps),
123
            output_file_grp=','.join(task.output_file_grps),
124
            parameter=task.parameter_path
125
        )
126
127
        # check return code
128
        if returncode != 0:
129
            raise Exception("%s exited with non-zero return value %s" % (task.executable, returncode))
130
131
        log.info("Finished processing task '%s'", task)
132
133
        # reload mets
134
        workspace.reload_mets()
135
136
        # check output file groups are in mets
137
        for output_file_grp in task.output_file_grps:
138
            if not output_file_grp in workspace.mets.file_groups:
139
                raise Exception("Invalid state: expected output file group not in mets: %s" % output_file_grp)
140
141