Passed
Push — master ( 5944d9...b0dff0 )
by Konstantin
02:14
created

ocrd.task_sequence   A

Complexity

Total Complexity 29

Size/Duplication

Total Lines 142
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 29
eloc 107
dl 0
loc 142
rs 10
c 0
b 0
f 0

5 Methods

Rating   Name   Duplication   Size   Complexity  
B ProcessorTask.parse() 0 22 7
A ProcessorTask.__str__() 0 8 2
A ProcessorTask.__init__() 0 6 1
A ProcessorTask.ocrd_tool_json() 0 7 2
B ProcessorTask.validate() 0 15 7

2 Functions

Rating   Name   Duplication   Size   Complexity  
B run_tasks() 0 39 5
A validate_tasks() 0 24 5
1
import json
2
from shlex import split as shlex_split
3
from distutils.spawn import find_executable as which # pylint: disable=import-error,no-name-in-module
4
from subprocess import run, PIPE
5
from collections import Counter
6
7
from ocrd_utils import getLogger, parse_json_string_or_file
8
from ocrd.processor.base import run_cli
9
from ocrd.resolver import Resolver
10
from ocrd_validators import ParameterValidator, WorkspaceValidator, ValidationReport
11
12
class ProcessorTask():
13
14
    @classmethod
15
    def parse(cls, argstr):
16
        tokens = shlex_split(argstr)
17
        executable = 'ocrd-%s' % tokens.pop(0)
18
        input_file_grps = []
19
        output_file_grps = []
20
        parameter_path = None
21
        while tokens:
22
            if tokens[0] == '-I':
23
                for grp in tokens[1].split(','):
24
                    input_file_grps.append(grp)
25
                tokens = tokens[2:]
26
            elif tokens[0] == '-O':
27
                for grp in tokens[1].split(','):
28
                    output_file_grps.append(grp)
29
                tokens = tokens[2:]
30
            elif tokens[0] == '-p':
31
                parameter_path = tokens[1]
32
                tokens = tokens[2:]
33
            else:
34
                raise Exception("Failed parsing task description '%s' with tokens remaining: '%s'" % (argstr, tokens))
35
        return ProcessorTask(executable, input_file_grps, output_file_grps, parameter_path)
36
37
    def __init__(self, executable, input_file_grps, output_file_grps, parameter_path=None):
38
        self.executable = executable
39
        self.input_file_grps = input_file_grps
40
        self.output_file_grps = output_file_grps
41
        self.parameter_path = parameter_path
42
        self._ocrd_tool_json = None
43
44
    @property
45
    def ocrd_tool_json(self):
46
        if self._ocrd_tool_json:
47
            return self._ocrd_tool_json
48
        result = run([self.executable, '--dump-json'], stdout=PIPE, check=True, universal_newlines=True)
49
        self._ocrd_tool_json = json.loads(result.stdout)
50
        return self._ocrd_tool_json
51
52
    def validate(self):
53
        if not which(self.executable):
54
            raise Exception("Executable not found in PATH: %s" % self.executable)
55
        if not self.input_file_grps:
56
            raise Exception("Task must have input file group")
57
        parameters = {}
58
        if self.parameter_path:
59
            parameters = parse_json_string_or_file(self.parameter_path)
60
        param_validator = ParameterValidator(self.ocrd_tool_json)
61
        report = param_validator.validate(parameters)
62
        if not report.is_valid:
63
            raise Exception(report.errors)
64
        if 'output_file_grp' in self.ocrd_tool_json and not self.output_file_grps:
65
            raise Exception("Processor requires output_file_grp but none was provided.")
66
        return report
67
68
    def __str__(self):
69
        ret = '%s -I %s -O %s' % (
70
            self.executable.replace('ocrd-', '', 1),
71
            ','.join(self.input_file_grps),
72
            ','.join(self.output_file_grps))
73
        if self.parameter_path:
74
            ret += ' -p %s' % self.parameter_path
75
        return ret
76
77
def validate_tasks(tasks, workspace):
78
    report = ValidationReport()
79
    prev_output_file_grps = workspace.mets.file_groups
80
81
    # first task: check input/output file groups from METS
82
    # TODO disable output_file_grps checks once CLI parameter 'overwrite' is implemented
83
    WorkspaceValidator.check_file_grp(workspace, tasks[0].input_file_grps, tasks[0].output_file_grps, report)
84
85
    prev_output_file_grps += tasks[0].input_file_grps
86
    for task in tasks[1:]:
87
        task.validate()
88
        # check either existing fileGrp or output-file group of previous task matches current input_file_group
89
        for input_file_grp in task.input_file_grps:
90
            if not input_file_grp in prev_output_file_grps:
91
                report.add_error("Input file group not contained in METS or produced by previous steps: %s" % input_file_grp)
92
        # TODO disable output_file_grps checks once CLI parameter 'overwrite' is implemented
93
        # XXX Thu Jan 16 20:14:17 CET 2020 still not sufficiently clever.
94
        #  if len(prev_output_file_grps) != len(set(prev_output_file_grps)):
95
        #      report.add_error("Output file group specified multiple times: %s" % 
96
        #          [grp for grp, count in Counter(prev_output_file_grps).items() if count >= 2])
97
        prev_output_file_grps += task.output_file_grps
98
    if not report.is_valid:
99
        raise Exception("Invalid task sequence input/output file groups: %s" % report.errors)
100
    return report
101
102
103
def run_tasks(mets, log_level, page_id, task_strs):
104
    resolver = Resolver()
105
    workspace = resolver.workspace_from_url(mets)
106
    log = getLogger('ocrd.task_sequence.run_tasks')
107
    tasks = [ProcessorTask.parse(task_str) for task_str in task_strs]
108
109
    validate_tasks(tasks, workspace)
110
111
    # Run the tasks
112
    for task in tasks:
113
114
        log.info("Start processing task '%s'", task)
115
116
        # execute cli
117
        returncode = run_cli(
118
            task.executable,
119
            mets,
120
            resolver,
121
            workspace,
122
            log_level=log_level,
123
            page_id=page_id,
124
            input_file_grp=','.join(task.input_file_grps),
125
            output_file_grp=','.join(task.output_file_grps),
126
            parameter=task.parameter_path
127
        )
128
129
        # check return code
130
        if returncode != 0:
131
            raise Exception("%s exited with non-zero return value %s" % (task.executable, returncode))
132
133
        log.info("Finished processing task '%s'", task)
134
135
        # reload mets
136
        workspace.reload_mets()
137
138
        # check output file groups are in mets
139
        for output_file_grp in task.output_file_grps:
140
            if not output_file_grp in workspace.mets.file_groups:
141
                raise Exception("Invalid state: expected output file group not in mets: %s" % output_file_grp)
142
143