1
|
|
|
from collections import OrderedDict |
2
|
|
|
from configparser import ConfigParser |
3
|
|
|
|
4
|
|
|
from topic_modeling_toolkit.processors.string_processors import MonoSpacer, StringProcessor, LowerCaser, UtfEncoder, DeAccenter, StringLemmatizer |
5
|
|
|
from topic_modeling_toolkit.processors.generator_processors import GeneratorProcessor, MinLengthFilter, MaxLengthFilter, WordToNgramGenerator |
6
|
|
|
from topic_modeling_toolkit.processors.string2generator import StringToTokenGenerator |
7
|
|
|
from topic_modeling_toolkit.processors import Processor, InitializationNeededComponent, FinalizationNeededComponent, BaseDiskWriterWithPrologue |
8
|
|
|
from topic_modeling_toolkit.processors.mutators import GensimDictTokenGeneratorToListProcessor, OneElemListOfListToGenerator |
9
|
|
|
|
10
|
|
|
from .disk_writer_processors import UciFormatWriter, VowpalFormatWriter |
11
|
|
|
from .processor import BaseDiskWriter |
12
|
|
|
|
13
|
|
|
|
14
|
|
|
import logging |
15
|
|
|
logger = logging.getLogger(__name__) |
16
|
|
|
|
17
|
|
|
settings_value2processors = { |
18
|
|
|
'lowercase': lambda x: LowerCaser() if x else None, |
19
|
|
|
'monospace': lambda x: MonoSpacer() if x else None, |
20
|
|
|
'unicode': lambda x: UtfEncoder() if x else None, |
21
|
|
|
'deaccent': lambda x: DeAccenter() if x else None, |
22
|
|
|
'normalize': lambda x: StringLemmatizer() if x == 'lemmatize' else None, |
23
|
|
|
'minlength': lambda x: MinLengthFilter(x) if x else None, |
24
|
|
|
'maxlength': lambda x: MaxLengthFilter(x) if x else None, |
25
|
|
|
'ngrams': lambda x: WordToNgramGenerator(x) if x else None, |
26
|
|
|
'nobelow': lambda x: x if x else None, |
27
|
|
|
'noabove': lambda x: x if x else None, |
28
|
|
|
'weight': lambda x: x if x else None, |
29
|
|
|
'format': lambda x: {'uci': UciFormatWriter(), 'vowpal': VowpalFormatWriter()}[x] if x else None |
30
|
|
|
} |
31
|
|
|
|
32
|
|
|
|
33
|
|
|
class Pipeline(object): |
34
|
|
|
|
35
|
|
|
def __init__(self, settings): |
36
|
|
|
assert isinstance(settings, OrderedDict) |
37
|
|
|
self._parser = lambda x: ('format', x[1]) if x[0][:6] == 'format' else x |
38
|
|
|
self._settings = settings |
39
|
|
|
self.processors_names = [processor_name for processor_name, v in map(self._parser, settings.items()) if settings_value2processors[processor_name](v) is not None] |
40
|
|
|
self.processors = [settings_value2processors[processor_name](v) for processor_name, v in map(self._parser, settings.items()) if settings_value2processors[processor_name](v) is not None] |
41
|
|
|
assert len(self.processors_names) == len(self.processors) |
42
|
|
|
self.str2gen_processor_index = 0 |
43
|
|
|
self.token_gen2list_index = 0 |
44
|
|
|
if not self._check_processors_pipeline(): |
45
|
|
|
print(self) |
46
|
|
|
raise ProcessorsOrderNotSoundException('The first n components of the pipeline have to be StringProcessors and the following m GeneratorProcessors with n,m>0') |
47
|
|
|
if any(isinstance(x, MonoSpacer) for x in self.processors): |
48
|
|
|
# self._insert(0, StringToTokenGenerator(' '), 'single-space-tokenizer') |
49
|
|
|
self.str2gen_processor = StringToTokenGenerator(' ') |
50
|
|
|
else: |
51
|
|
|
print(self) |
52
|
|
|
raise SupportedTokenizerNotFoundException('The implemented \'single-space\' tokenizer requires the presence of a MonoSpacer processor in the pipeline') |
53
|
|
|
self._inject_connectors() |
54
|
|
|
|
55
|
|
|
@property |
56
|
|
|
def settings(self): |
57
|
|
|
return self._settings |
58
|
|
|
|
59
|
|
|
def __len__(self): |
60
|
|
|
return len(self.processors) |
61
|
|
|
|
62
|
|
|
def __str__(self): |
63
|
|
|
b = '{}\nlen: {}\n'.format(type(self).__name__, len(self)) |
64
|
|
|
b += '\n'.join('{}: {}'.format(pr_name, pr_instance) for pr_name, pr_instance in zip(self.processors_names, self.processors)) |
65
|
|
|
return b |
66
|
|
|
|
67
|
|
|
def __getitem__(self, item): |
68
|
|
|
return self.processors_names[item], self.processors[item] |
69
|
|
|
|
70
|
|
|
def __iter__(self): |
71
|
|
|
for pr_name, pr in zip(self.processors_names, self.processors): |
72
|
|
|
yield pr_name, pr |
73
|
|
|
@property |
74
|
|
|
def transformers(self): |
75
|
|
|
return [(name, proc_obj) for name, proc_obj in self if isinstance(proc_obj, BaseDiskWriter)] |
76
|
|
|
|
77
|
|
|
@property |
78
|
|
|
def disk_writers(self): |
79
|
|
|
return [(name, proc_obj) for name, proc_obj in self if isinstance(proc_obj, BaseDiskWriter)] |
80
|
|
|
|
81
|
|
|
def initialize(self, *args, **kwargs): |
82
|
|
|
"""Call this method to initialize each of the pipeline's processors""" |
83
|
|
|
if self.disk_writers and not 'file_paths' in kwargs: |
84
|
|
|
logger.error("You have to supply the 'file_paths' list as a key argument, with each element being the target file path one per BaseDiskWriter processor.") |
85
|
|
|
return |
86
|
|
|
disk_writer_index = 0 |
87
|
|
|
for pr_name, pr_obj in self: |
88
|
|
|
if isinstance(pr_obj, InitializationNeededComponent): |
89
|
|
|
pr_obj.initialize(file_paths=kwargs.get('file_paths', []), disk_writer_index=disk_writer_index) |
90
|
|
|
if isinstance(pr_obj, BaseDiskWriter): |
91
|
|
|
disk_writer_index += 1 |
92
|
|
|
|
93
|
|
|
# def initilialize_processing_units(self): |
94
|
|
|
# for pr_name, pr_obj in self: |
95
|
|
|
# if isinstance(pr_obj, InitializationNeededComponent) and not isinstance(pr_obj, BaseDiskWriter): |
96
|
|
|
# pr_obj.initialize() |
97
|
|
|
# |
98
|
|
|
# def initilialize_disk_writting_units(self, file_paths): |
99
|
|
|
# i = -1 |
100
|
|
|
# for pr_name, pr_obj in self: |
101
|
|
|
# if isinstance(pr_obj, BaseDiskWriter): |
102
|
|
|
# i += 1 |
103
|
|
|
# pr_obj.initialize(file_name=file_paths[i]) |
104
|
|
|
# |
105
|
|
|
# def initialize(self, *args, **kwargs): |
106
|
|
|
# self.initilialize_processing_units() |
107
|
|
|
# if 'format' in self.processors_names: |
108
|
|
|
# self.initilialize_disk_writting_units(kwargs.get('file_paths', [])) |
109
|
|
|
|
110
|
|
|
# def initialize(self, *args, **kwargs): |
111
|
|
|
# |
112
|
|
|
# depth = len(self) |
113
|
|
|
# if args: |
114
|
|
|
# depth = args[0] |
115
|
|
|
# file_names = kwargs.get('file_names', []) |
116
|
|
|
# writer_index = 0 |
117
|
|
|
# for pr_name, pr_obj in self[:depth]: |
118
|
|
|
# if isinstance(pr_obj, InitializationNeededComponent): |
119
|
|
|
# if isinstance(pr_obj, BaseDiskWriter): |
120
|
|
|
# pr_obj.initialize(file_name=file_names[writer_index]) |
121
|
|
|
# writer_index += 1 |
122
|
|
|
# file_names = file_names[1:] |
123
|
|
|
# else: |
124
|
|
|
# pr_obj.initialize() |
125
|
|
|
# self._init_index = depth |
126
|
|
|
|
127
|
|
|
def pipe_through(self, data, depth): |
128
|
|
|
for proc in self.processors[:depth]: |
129
|
|
|
if isinstance(proc, Processor): |
130
|
|
|
data = proc.process(data) |
131
|
|
|
return data |
132
|
|
|
|
133
|
|
|
def pipe_through_processing_units(self, data): |
134
|
|
|
for proc in self.processors: |
135
|
|
|
if isinstance(proc, Processor) and not isinstance(proc, BaseDiskWriter): |
136
|
|
|
data = proc.process(data) |
137
|
|
|
return data |
138
|
|
|
|
139
|
|
|
def pipe_through_disk_writers(self, data, file_paths): |
140
|
|
|
for i, (name, proc) in enumerate(self.disk_writers): |
141
|
|
|
proc.initialize(file_paths, i) |
142
|
|
|
for name, proc in self.disk_writers: |
143
|
|
|
data = proc.process(data) |
144
|
|
|
|
145
|
|
|
def finalize(self, prologs=tuple([])): |
146
|
|
|
i = 0 |
147
|
|
|
for pr_name, pr_obj in self: |
148
|
|
|
if isinstance(pr_obj, BaseDiskWriterWithPrologue): |
149
|
|
|
pr_obj.finalize(prologs[i]) |
150
|
|
|
i += 1 |
151
|
|
|
elif isinstance(pr_obj, FinalizationNeededComponent): |
152
|
|
|
pr_obj.finalize() |
153
|
|
|
|
154
|
|
|
def _insert(self, index, processor, type_name): |
155
|
|
|
self.processors.insert(index, processor) |
156
|
|
|
self.processors_names.insert(index, type_name) |
157
|
|
|
|
158
|
|
|
def _inject_connectors(self): |
159
|
|
|
assert (self.str2gen_processor_index != 0 and self.token_gen2list_index != 0) |
160
|
|
|
# STRING PROCESSORS (strings are passing through) |
161
|
|
|
self._insert(self.str2gen_processor_index, self.str2gen_processor, 'str2token_gen') |
162
|
|
|
# generators passing thorugh |
163
|
|
|
self._insert(self.token_gen2list_index + 1, GensimDictTokenGeneratorToListProcessor(), 'dict-builder') |
164
|
|
|
self._insert(self.token_gen2list_index + 2, OneElemListOfListToGenerator(), 'list2generator') |
165
|
|
|
|
166
|
|
|
def _check_processors_pipeline(self): |
167
|
|
|
i = 0 |
168
|
|
|
proc = self[i][1] |
169
|
|
|
while isinstance(proc, StringProcessor): |
170
|
|
|
i += 1 |
171
|
|
|
proc = self[i][1] |
172
|
|
|
if i == 0: |
173
|
|
|
return False |
174
|
|
|
l1 = i |
175
|
|
|
self.str2gen_processor_index = l1 |
176
|
|
|
while isinstance(proc, GeneratorProcessor): |
177
|
|
|
i += 1 |
178
|
|
|
proc = self[i][1] |
179
|
|
|
if i == l1: |
180
|
|
|
return False |
181
|
|
|
self.token_gen2list_index = i |
182
|
|
|
return True |
183
|
|
|
|
184
|
|
|
@staticmethod |
185
|
|
|
def _tuple2string(pipeline_component, value): |
186
|
|
|
if type(value) == bool: |
187
|
|
|
if value: |
188
|
|
|
return pipeline_component |
189
|
|
|
else: |
190
|
|
|
return '' |
191
|
|
|
elif pipeline_component == 'format': |
192
|
|
|
return value |
193
|
|
|
else: |
194
|
|
|
return '{}-{}'.format(pipeline_component, value) |
195
|
|
|
|
196
|
|
|
def get_id(self): |
197
|
|
|
return '_'.join(_ for _ in [self._tuple2string(pipeline_component, value) for pipeline_component, value in self._settings.items()] if _) |
198
|
|
|
|
199
|
|
|
@classmethod |
200
|
|
|
def from_cfg(cls, cfg_file_path): |
201
|
|
|
config = ConfigParser() |
202
|
|
|
config.read(cfg_file_path) |
203
|
|
|
pipe_settings = OrderedDict([item for sublist in map(lambda x: [(x[0], encode_pipeline_cfg[x[0]](x[1]))] if x[0] != 'format' else [(x[0] + str(i + 1), out_frmt) for i, out_frmt in enumerate(x[1].split(','))], config.items('preprocessing')) for item in sublist]) |
204
|
|
|
# print 'Pipe-Config:\n' + ',\n'.join('{}: {}'.format(key, value) for key, value in pipe_settings.items()) |
205
|
|
|
return Pipeline(pipe_settings) |
206
|
|
|
|
207
|
|
|
@classmethod |
208
|
|
|
def from_tuples(cls, data): |
209
|
|
|
return Pipeline(OrderedDict([item for sublist in |
210
|
|
|
map(lambda x: [(x[0], encode_pipeline_cfg[x[0]](x[1]))] if x[0] != 'format' else [(x[0] + str(i + 1) |
211
|
|
|
, out_frmt) |
212
|
|
|
for i, out_frmt in |
213
|
|
|
enumerate(x[1].split(','))], data) for item in sublist])) |
214
|
|
|
|
215
|
|
|
encode_pipeline_cfg = { |
216
|
|
|
'lowercase': lambda x: bool(eval(x)), |
217
|
|
|
'monospace': lambda x: bool(eval(x)), |
218
|
|
|
'unicode': lambda x: bool(eval(x)), |
219
|
|
|
'deaccent': lambda x: bool(eval(x)), |
220
|
|
|
'normalize': str, |
221
|
|
|
'minlength': int, |
222
|
|
|
'maxlength': int, |
223
|
|
|
'nobelow': int, |
224
|
|
|
'noabove': float, |
225
|
|
|
'ngrams': int, |
226
|
|
|
'weight': str, |
227
|
|
|
'format': str |
228
|
|
|
} |
229
|
|
|
|
230
|
|
|
|
231
|
|
|
class SupportedTokenizerNotFoundException(Exception): pass |
232
|
|
|
class ProcessorsOrderNotSoundException(Exception): pass |
233
|
|
|
|
234
|
|
|
|
235
|
|
|
|