1
|
|
|
import os |
2
|
|
|
import re |
3
|
|
|
import sys |
4
|
|
|
import argparse |
5
|
|
|
from operator import itemgetter |
6
|
|
|
from collections import OrderedDict |
7
|
|
|
import pandas as pd |
8
|
|
|
from configparser import ConfigParser |
9
|
|
|
from gensim.corpora import Dictionary |
10
|
|
|
from gensim.models.tfidfmodel import TfidfModel |
11
|
|
|
|
12
|
|
|
from .modeling.dataset_extraction import CategoryToFieldsGenerator |
13
|
|
|
from .dataset import TextDataset |
14
|
|
|
from topic_modeling_toolkit.processors import Pipeline |
15
|
|
|
|
16
|
|
|
from .definitions import IDEOLOGY_CLASS_NAME, COOCURENCE_DICT_FILE_NAMES# = ['cooc_tf_', 'cooc_df_', 'ppmi_tf_', 'ppmi_df_'] |
17
|
|
|
|
18
|
|
|
# import logging |
19
|
|
|
# logger = logging.getLogger(__name__) |
20
|
|
|
|
21
|
|
|
|
22
|
|
|
class PipeHandler(object): |
23
|
|
|
def __init__(self): |
24
|
|
|
self.cat2textgen_proc = None |
25
|
|
|
self.text_generator = None |
26
|
|
|
self.doc_gen_stats = {} |
27
|
|
|
self.dct = None |
28
|
|
|
self.corpus = None |
29
|
|
|
self.nb_docs = 0 |
30
|
|
|
self._pipeline = None |
31
|
|
|
self.dataset = None |
32
|
|
|
self._collection = '' |
33
|
|
|
self.vocab_file = '' |
34
|
|
|
self.uci_file = '' |
35
|
|
|
self.vowpal_file = '' |
36
|
|
|
self.outlet_ids = [] |
37
|
|
|
self._pack_data = None |
38
|
|
|
self._data_models = {} |
39
|
|
|
self._data_model2constructor = {'counts': lambda x: x, |
40
|
|
|
'tfidf': TfidfModel} |
41
|
|
|
self._vec_gen = {'counts': lambda bow_model: (_ for _ in bow_model), |
42
|
|
|
'tfidf': lambda bow_model: (_ for _ in map(lambda x: self._data_models['tfidf'][x], bow_model))} |
43
|
|
|
self._format_data_tr = { |
44
|
|
|
'uci': lambda x: x[1], |
45
|
|
|
'vowpal': lambda x: [map(lambda y: (self.dct[y[0]], y[1]), x[1]), {IDEOLOGY_CLASS_NAME: self.label(self.outlet_ids[x[0]])}] |
46
|
|
|
} |
47
|
|
|
self._labels_hash = {} |
48
|
|
|
|
49
|
|
|
@property |
50
|
|
|
def labels_hash(self): |
51
|
|
|
return self._labels_hash |
52
|
|
|
|
53
|
|
|
@labels_hash.setter |
54
|
|
|
def labels_hash(self, outlet_id2document_label_hash): |
55
|
|
|
self._labels_hash = outlet_id2document_label_hash |
56
|
|
|
|
57
|
|
|
def label(self, outlet_id): |
58
|
|
|
return self._labels_hash[outlet_id] |
59
|
|
|
|
60
|
|
|
@property |
61
|
|
|
def labels(self): |
62
|
|
|
return [self.label(x) for x in self.outlet_ids] |
63
|
|
|
|
64
|
|
|
@property |
65
|
|
|
def pipeline(self): |
66
|
|
|
return self._pipeline |
67
|
|
|
@pipeline.setter |
68
|
|
|
def pipeline(self, pipeline): |
69
|
|
|
"""Set the processing pipeline for the handler to use.\n |
70
|
|
|
:param str or processors.pipeline.Pipeline pipeline: |
71
|
|
|
""" |
72
|
|
|
if type(pipeline) == str: |
73
|
|
|
self._pipeline = Pipeline.from_cfg(pipeline) |
74
|
|
|
else: |
75
|
|
|
self._pipeline = pipeline |
76
|
|
|
|
77
|
|
|
def process(self, pipeline, category, sample='all', verbose=False): |
78
|
|
|
self.pipeline = pipeline |
79
|
|
|
if verbose: |
80
|
|
|
print(self._pipeline) |
81
|
|
|
self.pipe_through_processors(category, num_docs=sample) |
82
|
|
|
|
83
|
|
|
def persist(self, dataset_path, labels_hash, class_names, add_class_labels_to_vocab=True): |
84
|
|
|
self._prepare_storing(dataset_path) |
85
|
|
|
self._labels_hash = labels_hash |
86
|
|
|
self.pipe_through_disk_writers() |
87
|
|
|
self.class_names = class_names |
88
|
|
|
self.write_vocab(dataset_path, add_class_labels=add_class_labels_to_vocab) |
89
|
|
|
return self.create_dataset(dataset_path) |
90
|
|
|
|
91
|
|
|
def preprocess(self, category, pipeline, collection_path, labels_hash, class_names, sample='all', add_class_labels_to_vocab=True): |
92
|
|
|
self.process(pipeline, category, sample=sample) |
93
|
|
|
return self.persist(collection_path, labels_hash, class_names, add_class_labels_to_vocab=add_class_labels_to_vocab) |
94
|
|
|
|
95
|
|
|
def _prepare_storing(self, dataset_path): |
96
|
|
|
self._collection = os.path.basename(dataset_path) |
97
|
|
|
self.uci_file = os.path.join(dataset_path, 'docword.{}.txt'.format(self._collection)) |
98
|
|
|
self.vowpal_file = os.path.join(dataset_path, 'vowpal.{}.txt'.format(self._collection)) |
99
|
|
|
self.pipeline.initialize(file_paths=[self.uci_file, self.vowpal_file]) |
100
|
|
|
|
101
|
|
|
##### |
102
|
|
|
def pipe_through_processors(self, category, num_docs='all'): |
103
|
|
|
doc_gens = [] |
104
|
|
|
self.outlet_ids = [] |
105
|
|
|
self.doc_gen_stats['corpus-tokens'] = 0 |
106
|
|
|
self.cat2textgen_proc = CategoryToFieldsGenerator(('text', 'poster_id'), nb_docs=num_docs) |
107
|
|
|
self.text_generator = self.cat2textgen_proc.process(category) |
108
|
|
|
print(self.cat2textgen_proc, '\n') |
109
|
|
|
for i, doc in enumerate(self.text_generator): |
110
|
|
|
doc_gens.append(self._pipeline.pipe_through(doc['text'], len(self._pipeline) - 2)) |
111
|
|
|
self.outlet_ids.append(str(doc['poster_id'])) # index outlets (document authors) ids |
112
|
|
|
|
113
|
|
|
self.dct = self._pipeline[self._pipeline.processors_names.index('dict-builder')][1].state |
114
|
|
|
# self.corpus = [self.dct.doc2bow([token for token in tok_gen]) for tok_gen in doc_gens] |
115
|
|
|
# print '{} tokens in all generators\n'.format(sum_toks) |
116
|
|
|
# print 'total bow tuples in corpus: {}'.format(sum(len(_) for _ in self.corpus)) |
117
|
|
|
# print "GENSIM-DICT:\nnum_pos (processes words): {}\nnum_nnz (nb of bow-tuples) {}\nvocab size: {}".format(self.dct.num_pos, self.dct.num_nnz, len(self.dct.items())) |
118
|
|
|
# print '\nnum_pos', self.dct.num_pos, '\nnum_nnz', self.dct.num_nnz, '\n{} items in dictionary'.format(len(self.dct.items())) |
119
|
|
|
print |
120
|
|
|
self._print_dict_stats() |
121
|
|
|
print("SAMPLE LEXICAL ITEMS:\n{}".format( |
122
|
|
|
'\n'.join(map(lambda x: '{}: {}'.format(x[0], x[1]), sorted(self.dct.iteritems(), key=itemgetter(0))[:5])))) |
123
|
|
|
|
124
|
|
|
tokens = [[token for token in tok_gen] for tok_gen in doc_gens] |
125
|
|
|
|
126
|
|
|
# print corpus stats before applying 'below' and 'above' filtering |
127
|
|
|
c = [self.dct.doc2bow(doc_tokens) for doc_tokens in tokens] |
128
|
|
|
self._print_bow_model_stats(c) |
129
|
|
|
|
130
|
|
|
print(' -- filter extremes -- ') |
131
|
|
|
self.dct.filter_extremes(no_below=self._pipeline.settings['nobelow'], |
132
|
|
|
no_above=self._pipeline.settings['noabove']) |
133
|
|
|
self._print_dict_stats() |
134
|
|
|
|
135
|
|
|
print(' -- compactify -- ') |
136
|
|
|
self.dct.compactify() |
137
|
|
|
self._print_dict_stats() |
138
|
|
|
|
139
|
|
|
# self.corpus = filter(None, [self.dct.doc2bow([token for token in tok_gen]) for tok_gen in doc_gens]) |
140
|
|
|
self.corpus = [self.dct.doc2bow(doc_tokens) for doc_tokens in tokens] |
141
|
|
|
self._print_bow_model_stats(self.corpus) |
142
|
|
|
|
143
|
|
|
# REMOVE EMPTY DOCS |
144
|
|
|
c = [_ for _ in self.corpus if _] |
145
|
|
|
self.corpus, self.outlet_ids = (list(x) for x in zip(*[[doc, label] for doc, label in zip(self.corpus, self.outlet_ids) if doc])) |
|
|
|
|
146
|
|
|
assert len(c) == len(self.corpus) == len(self.outlet_ids) |
147
|
|
|
self._print_bow_model_stats(self.corpus) |
148
|
|
|
print |
149
|
|
|
|
150
|
|
|
def pipe_through_disk_writers(self): |
151
|
|
|
"""Call to pass through the last BaseDiskWriter processors of the pieline. Assumes the last non BaseDsikWriter processor in the pipeline is a 'weight' so that a 'counts 'or 'tfidf' token weight model is computed""" |
152
|
|
|
if len(self.corpus) != len(self.outlet_ids): |
153
|
|
|
logger.warning("Please fix the logic because there is a missmatch between documents and labels: {} != {}".format(len(self.corpus), len(self.outlet_ids))) |
|
|
|
|
154
|
|
|
for _, processor in self.pipeline.disk_writers: |
155
|
|
|
for i, vector in enumerate(self._get_iterable_data_model(self.pipeline.settings['weight'])): # 'counts' only supported (future work: 'tfidf') |
156
|
|
|
processor.process(self._format_data_tr[processor.to_id()]((i, vector))) |
157
|
|
|
self.doc_gen_stats.update({'docs-gen': self.cat2textgen_proc.nb_processed, 'docs-failed': len(self.cat2textgen_proc.failed)}) |
158
|
|
|
|
159
|
|
|
# the first 3 lines of a uci formatted file: correspond to nb_docs, vocab_size, sum of nb of tuples (representing the bow model) found in all documents. |
160
|
|
|
# They should be written on the top |
161
|
|
|
prologue_lines = map(lambda x: str(x), [self.dct.num_docs, len(self.dct.items()), sum(len(_) for _ in self.corpus)]) |
162
|
|
|
self.pipeline.finalize([prologue_lines]) |
163
|
|
|
|
164
|
|
|
def _get_iterable_data_model(self, data_model): |
165
|
|
|
if data_model not in self._data_models: |
166
|
|
|
self._data_models[data_model] = self._data_model2constructor[data_model](self.corpus) |
167
|
|
|
return self._vec_gen[data_model](self.corpus) |
168
|
|
|
|
169
|
|
|
####### |
170
|
|
|
def write_vocab(self, dataset_path, add_class_labels=True): |
171
|
|
|
# Define file and dump the vocabulary (list of unique tokens, one per line) |
172
|
|
|
self.vocab_file = os.path.join(dataset_path, 'vocab.{}.txt'.format(os.path.basename(dataset_path))) |
173
|
|
|
if not os.path.isfile(self.vocab_file): |
174
|
|
|
with open(self.vocab_file, 'w') as f: |
175
|
|
|
for string_id, string in self._vocab_tokens_generator(include_class_labels=add_class_labels): |
176
|
|
|
try: |
177
|
|
|
# f.write('{}\n'.format(string.encode('utf-8'))) |
178
|
|
|
f.write('{}\n'.format(string)) |
179
|
|
|
except UnicodeEncodeError as e: |
180
|
|
|
# f.write('\n'.join(map(lambda x: '{}'.format(str(x[1])), sorted([_ for _ in self.dct.iteritems()], key=itemgetter(0))))) |
181
|
|
|
print('FAILED', type(string_id), string) |
182
|
|
|
raise e |
183
|
|
|
print("Created '{}' file".format(self.vocab_file)) |
184
|
|
|
else: |
185
|
|
|
print("File '{}' already exists. Skipping.".format(self.vocab_file)) |
186
|
|
|
|
187
|
|
|
def _vocab_tokens_generator(self, include_class_labels=True): |
188
|
|
|
for gram_id, gram_string in self.dct.iteritems(): |
189
|
|
|
yield gram_id, gram_string |
190
|
|
|
if include_class_labels: |
191
|
|
|
for class_label in [_ for _ in self.class_names if _ in set(self.labels)]: |
192
|
|
|
yield 'class_modality', '{} {}'.format(class_label, IDEOLOGY_CLASS_NAME) |
193
|
|
|
|
194
|
|
|
####### |
195
|
|
|
def create_dataset(self, dataset_path): |
196
|
|
|
dataset = TextDataset(os.path.basename(dataset_path), self._get_dataset_id(), |
197
|
|
|
len(self.corpus), len(self.dct.items()), sum(len(_) for _ in self.corpus), |
198
|
|
|
self.uci_file, self.vocab_file, self.vowpal_file) |
199
|
|
|
dataset.root_dir = dataset_path |
200
|
|
|
dataset.save() |
201
|
|
|
return dataset |
202
|
|
|
|
203
|
|
|
def _get_dataset_id(self): |
204
|
|
|
idd = self._pipeline.get_id() # get_id(self._pipeline.settings) |
205
|
|
|
ri = idd.rfind('_') |
206
|
|
|
return str(len(self.corpus)) + '_' + idd[:ri] + '.' + idd[ri + 1:] |
207
|
|
|
|
208
|
|
|
###### UTILS |
209
|
|
|
def _print_dict_stats(self): |
210
|
|
|
print("GENSIM-DICT:\nnum_pos (processes words): {}\nnum_nnz (nb of bow-tuples) {}\nvocab size: {}".format( |
211
|
|
|
self.dct.num_pos, self.dct.num_nnz, len(self.dct.items()))) |
212
|
|
|
|
213
|
|
|
@classmethod |
214
|
|
|
def _print_bow_model_stats(cls, bow_corpus): |
215
|
|
|
print("BOW-MODEL:\nnumber of word position (num_pos): {}\ntotal number of tuples (num_nnz): {}\n number of docs: {}\nempty docs: {}".format( |
216
|
|
|
sum(sum(bow_tuple[1] for bow_tuple in doc) for doc in bow_corpus), sum(len(_) for _ in bow_corpus), len(bow_corpus), len([_ for _ in bow_corpus if not _]))) |
217
|
|
|
|