processors.pipeline.Pipeline.get_id()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 1
dl 0
loc 2
rs 10
c 0
b 0
f 0
1
from collections import OrderedDict
2
from configparser import ConfigParser
3
4
from topic_modeling_toolkit.processors.string_processors import MonoSpacer, StringProcessor, LowerCaser, UtfEncoder, DeAccenter, StringLemmatizer
5
from topic_modeling_toolkit.processors.generator_processors import GeneratorProcessor, MinLengthFilter, MaxLengthFilter, WordToNgramGenerator
6
from topic_modeling_toolkit.processors.string2generator import StringToTokenGenerator
7
from topic_modeling_toolkit.processors import Processor, InitializationNeededComponent, FinalizationNeededComponent, BaseDiskWriterWithPrologue
8
from topic_modeling_toolkit.processors.mutators import GensimDictTokenGeneratorToListProcessor, OneElemListOfListToGenerator
9
10
from .disk_writer_processors import UciFormatWriter, VowpalFormatWriter
11
from .processor import BaseDiskWriter
12
13
14
import logging
15
logger = logging.getLogger(__name__)
16
17
settings_value2processors = {
18
    'lowercase': lambda x: LowerCaser() if x else None,
19
    'monospace': lambda x: MonoSpacer() if x else None,
20
    'unicode': lambda x: UtfEncoder() if x else None,
21
    'deaccent': lambda x: DeAccenter() if x else None,
22
    'normalize': lambda x: StringLemmatizer() if x == 'lemmatize' else None,
23
    'minlength': lambda x: MinLengthFilter(x) if x else None,
24
    'maxlength': lambda x: MaxLengthFilter(x) if x else None,
25
    'ngrams': lambda x: WordToNgramGenerator(x) if x else None,
26
    'nobelow': lambda x: x if x else None,
27
    'noabove': lambda x: x if x else None,
28
    'weight': lambda x: x if x else None,
29
    'format': lambda x: {'uci': UciFormatWriter(), 'vowpal': VowpalFormatWriter()}[x] if x else None
30
}
31
32
33
class Pipeline(object):
34
35
    def __init__(self, settings):
36
        assert isinstance(settings, OrderedDict)
37
        self._parser = lambda x: ('format', x[1]) if x[0][:6] == 'format' else x
38
        self._settings = settings
39
        self.processors_names = [processor_name for processor_name, v in map(self._parser, settings.items()) if settings_value2processors[processor_name](v) is not None]
40
        self.processors = [settings_value2processors[processor_name](v) for processor_name, v in map(self._parser, settings.items()) if settings_value2processors[processor_name](v) is not None]
41
        assert len(self.processors_names) == len(self.processors)
42
        self.str2gen_processor_index = 0
43
        self.token_gen2list_index = 0
44
        if not self._check_processors_pipeline():
45
            print(self)
46
            raise ProcessorsOrderNotSoundException('The first n components of the pipeline have to be StringProcessors and the following m GeneratorProcessors with n,m>0')
47
        if any(isinstance(x, MonoSpacer) for x in self.processors):
48
            # self._insert(0, StringToTokenGenerator(' '), 'single-space-tokenizer')
49
            self.str2gen_processor = StringToTokenGenerator(' ')
50
        else:
51
            print(self)
52
            raise SupportedTokenizerNotFoundException('The implemented \'single-space\' tokenizer requires the presence of a MonoSpacer processor in the pipeline')
53
        self._inject_connectors()
54
55
    @property
56
    def settings(self):
57
        return self._settings
58
59
    def __len__(self):
60
        return len(self.processors)
61
62
    def __str__(self):
63
        b = '{}\nlen: {}\n'.format(type(self).__name__, len(self))
64
        b += '\n'.join('{}: {}'.format(pr_name, pr_instance) for pr_name, pr_instance in zip(self.processors_names, self.processors))
65
        return b
66
67
    def __getitem__(self, item):
68
        return self.processors_names[item], self.processors[item]
69
70
    def __iter__(self):
71
        for pr_name, pr in zip(self.processors_names, self.processors):
72
            yield pr_name, pr
73
    @property
74
    def transformers(self):
75
        return [(name, proc_obj) for name, proc_obj in self if isinstance(proc_obj, BaseDiskWriter)]
76
77
    @property
78
    def disk_writers(self):
79
        return [(name, proc_obj) for name, proc_obj in self if isinstance(proc_obj, BaseDiskWriter)]
80
81
    def initialize(self, *args, **kwargs):
82
        """Call this method to initialize each of the pipeline's processors"""
83
        if self.disk_writers and not 'file_paths' in kwargs:
84
            logger.error("You have to supply the 'file_paths' list as a key argument, with each element being the target file path one per BaseDiskWriter processor.")
85
            return
86
        disk_writer_index = 0
87
        for pr_name, pr_obj in self:
88
            if isinstance(pr_obj, InitializationNeededComponent):
89
                pr_obj.initialize(file_paths=kwargs.get('file_paths', []), disk_writer_index=disk_writer_index)
90
                if isinstance(pr_obj, BaseDiskWriter):
91
                    disk_writer_index += 1
92
93
    # def initilialize_processing_units(self):
94
    #     for pr_name, pr_obj in self:
95
    #         if isinstance(pr_obj, InitializationNeededComponent) and not isinstance(pr_obj, BaseDiskWriter):
96
    #             pr_obj.initialize()
97
    #
98
    # def initilialize_disk_writting_units(self, file_paths):
99
    #     i = -1
100
    #     for pr_name, pr_obj in self:
101
    #         if isinstance(pr_obj, BaseDiskWriter):
102
    #             i += 1
103
    #             pr_obj.initialize(file_name=file_paths[i])
104
    #
105
    # def initialize(self, *args, **kwargs):
106
    #     self.initilialize_processing_units()
107
    #     if 'format' in self.processors_names:
108
    #         self.initilialize_disk_writting_units(kwargs.get('file_paths', []))
109
110
    # def initialize(self, *args, **kwargs):
111
    #
112
    #     depth = len(self)
113
    #     if args:
114
    #         depth = args[0]
115
    #     file_names = kwargs.get('file_names', [])
116
    #     writer_index = 0
117
    #     for pr_name, pr_obj in self[:depth]:
118
    #         if isinstance(pr_obj, InitializationNeededComponent):
119
    #             if isinstance(pr_obj, BaseDiskWriter):
120
    #                 pr_obj.initialize(file_name=file_names[writer_index])
121
    #                 writer_index += 1
122
    #                 file_names = file_names[1:]
123
    #             else:
124
    #                 pr_obj.initialize()
125
    #     self._init_index = depth
126
127
    def pipe_through(self, data, depth):
128
        for proc in self.processors[:depth]:
129
            if isinstance(proc, Processor):
130
                data = proc.process(data)
131
        return data
132
133
    def pipe_through_processing_units(self, data):
134
        for proc in self.processors:
135
            if isinstance(proc, Processor) and not isinstance(proc, BaseDiskWriter):
136
                data = proc.process(data)
137
        return data
138
139
    def pipe_through_disk_writers(self, data, file_paths):
140
        for i, (name, proc) in enumerate(self.disk_writers):
141
            proc.initialize(file_paths, i)
142
        for name, proc in self.disk_writers:
143
            data = proc.process(data)
144
145
    def finalize(self, prologs=tuple([])):
146
        i = 0
147
        for pr_name, pr_obj in self:
148
            if isinstance(pr_obj, BaseDiskWriterWithPrologue):
149
                pr_obj.finalize(prologs[i])
150
                i += 1
151
            elif isinstance(pr_obj, FinalizationNeededComponent):
152
                pr_obj.finalize()
153
154
    def _insert(self, index, processor, type_name):
155
        self.processors.insert(index, processor)
156
        self.processors_names.insert(index, type_name)
157
158
    def _inject_connectors(self):
159
        assert (self.str2gen_processor_index != 0 and self.token_gen2list_index != 0)
160
        # STRING PROCESSORS (strings are passing through)
161
        self._insert(self.str2gen_processor_index, self.str2gen_processor, 'str2token_gen')
162
        # generators passing thorugh
163
        self._insert(self.token_gen2list_index + 1, GensimDictTokenGeneratorToListProcessor(), 'dict-builder')
164
        self._insert(self.token_gen2list_index + 2, OneElemListOfListToGenerator(), 'list2generator')
165
166
    def _check_processors_pipeline(self):
167
        i = 0
168
        proc = self[i][1]
169
        while isinstance(proc, StringProcessor):
170
            i += 1
171
            proc = self[i][1]
172
        if i == 0:
173
            return False
174
        l1 = i
175
        self.str2gen_processor_index = l1
176
        while isinstance(proc, GeneratorProcessor):
177
            i += 1
178
            proc = self[i][1]
179
        if i == l1:
180
            return False
181
        self.token_gen2list_index = i
182
        return True
183
184
    @staticmethod
185
    def _tuple2string(pipeline_component, value):
186
        if type(value) == bool:
187
            if value:
188
                return pipeline_component
189
            else:
190
                return ''
191
        elif pipeline_component == 'format':
192
            return value
193
        else:
194
            return '{}-{}'.format(pipeline_component, value)
195
196
    def get_id(self):
197
        return '_'.join(_ for _ in [self._tuple2string(pipeline_component, value) for pipeline_component, value in self._settings.items()] if _)
198
199
    @classmethod
200
    def from_cfg(cls, cfg_file_path):
201
        config = ConfigParser()
202
        config.read(cfg_file_path)
203
        pipe_settings = OrderedDict([item for sublist in map(lambda x: [(x[0], encode_pipeline_cfg[x[0]](x[1]))] if x[0] != 'format' else [(x[0] + str(i + 1), out_frmt) for i, out_frmt in enumerate(x[1].split(','))], config.items('preprocessing')) for item in sublist])
204
        # print 'Pipe-Config:\n' + ',\n'.join('{}: {}'.format(key, value) for key, value in pipe_settings.items())
205
        return Pipeline(pipe_settings)
206
207
    @classmethod
208
    def from_tuples(cls, data):
209
        return Pipeline(OrderedDict([item for sublist in
210
                                     map(lambda x: [(x[0], encode_pipeline_cfg[x[0]](x[1]))] if x[0] != 'format' else [(x[0] + str(i + 1)
211
                                                                                                                        , out_frmt)
212
                                                                                                                       for i, out_frmt in
213
                                                                                                                       enumerate(x[1].split(','))], data) for item in sublist]))
214
215
encode_pipeline_cfg = {
216
    'lowercase': lambda x: bool(eval(x)),
217
    'monospace': lambda x: bool(eval(x)),
218
    'unicode': lambda x: bool(eval(x)),
219
    'deaccent': lambda x: bool(eval(x)),
220
    'normalize': str,
221
    'minlength': int,
222
    'maxlength': int,
223
    'nobelow': int,
224
    'noabove': float,
225
    'ngrams': int,
226
    'weight': str,
227
    'format': str
228
}
229
230
231
class SupportedTokenizerNotFoundException(Exception): pass
232
class ProcessorsOrderNotSoundException(Exception): pass
233
234
235