|
1
|
|
|
import os |
|
2
|
|
|
import re |
|
3
|
|
|
import sys |
|
4
|
|
|
import argparse |
|
5
|
|
|
from operator import itemgetter |
|
6
|
|
|
from collections import OrderedDict |
|
7
|
|
|
import pandas as pd |
|
8
|
|
|
from configparser import ConfigParser |
|
9
|
|
|
from gensim.corpora import Dictionary |
|
10
|
|
|
from gensim.models.tfidfmodel import TfidfModel |
|
11
|
|
|
|
|
12
|
|
|
from .modeling.dataset_extraction import CategoryToFieldsGenerator |
|
13
|
|
|
from .dataset import TextDataset |
|
14
|
|
|
from topic_modeling_toolkit.processors import Pipeline |
|
15
|
|
|
|
|
16
|
|
|
from .definitions import IDEOLOGY_CLASS_NAME, COOCURENCE_DICT_FILE_NAMES# = ['cooc_tf_', 'cooc_df_', 'ppmi_tf_', 'ppmi_df_'] |
|
17
|
|
|
|
|
18
|
|
|
# import logging |
|
19
|
|
|
# logger = logging.getLogger(__name__) |
|
20
|
|
|
|
|
21
|
|
|
|
|
22
|
|
|
class PipeHandler(object): |
|
23
|
|
|
def __init__(self): |
|
24
|
|
|
self.cat2textgen_proc = None |
|
25
|
|
|
self.text_generator = None |
|
26
|
|
|
self.doc_gen_stats = {} |
|
27
|
|
|
self.dct = None |
|
28
|
|
|
self.corpus = None |
|
29
|
|
|
self.nb_docs = 0 |
|
30
|
|
|
self._pipeline = None |
|
31
|
|
|
self.dataset = None |
|
32
|
|
|
self._collection = '' |
|
33
|
|
|
self.vocab_file = '' |
|
34
|
|
|
self.uci_file = '' |
|
35
|
|
|
self.vowpal_file = '' |
|
36
|
|
|
self.outlet_ids = [] |
|
37
|
|
|
self._pack_data = None |
|
38
|
|
|
self._data_models = {} |
|
39
|
|
|
self._data_model2constructor = {'counts': lambda x: x, |
|
40
|
|
|
'tfidf': TfidfModel} |
|
41
|
|
|
self._vec_gen = {'counts': lambda bow_model: (_ for _ in bow_model), |
|
42
|
|
|
'tfidf': lambda bow_model: (_ for _ in map(lambda x: self._data_models['tfidf'][x], bow_model))} |
|
43
|
|
|
self._format_data_tr = { |
|
44
|
|
|
'uci': lambda x: x[1], |
|
45
|
|
|
'vowpal': lambda x: [map(lambda y: (self.dct[y[0]], y[1]), x[1]), {IDEOLOGY_CLASS_NAME: self.label(self.outlet_ids[x[0]])}] |
|
46
|
|
|
} |
|
47
|
|
|
self._labels_hash = {} |
|
48
|
|
|
|
|
49
|
|
|
@property |
|
50
|
|
|
def labels_hash(self): |
|
51
|
|
|
return self._labels_hash |
|
52
|
|
|
|
|
53
|
|
|
@labels_hash.setter |
|
54
|
|
|
def labels_hash(self, outlet_id2document_label_hash): |
|
55
|
|
|
self._labels_hash = outlet_id2document_label_hash |
|
56
|
|
|
|
|
57
|
|
|
def label(self, outlet_id): |
|
58
|
|
|
return self._labels_hash[outlet_id] |
|
59
|
|
|
|
|
60
|
|
|
@property |
|
61
|
|
|
def labels(self): |
|
62
|
|
|
return [self.label(x) for x in self.outlet_ids] |
|
63
|
|
|
|
|
64
|
|
|
@property |
|
65
|
|
|
def pipeline(self): |
|
66
|
|
|
return self._pipeline |
|
67
|
|
|
@pipeline.setter |
|
68
|
|
|
def pipeline(self, pipeline): |
|
69
|
|
|
"""Set the processing pipeline for the handler to use.\n |
|
70
|
|
|
:param str or processors.pipeline.Pipeline pipeline: |
|
71
|
|
|
""" |
|
72
|
|
|
if type(pipeline) == str: |
|
73
|
|
|
self._pipeline = Pipeline.from_cfg(pipeline) |
|
74
|
|
|
else: |
|
75
|
|
|
self._pipeline = pipeline |
|
76
|
|
|
|
|
77
|
|
|
def process(self, pipeline, category, sample='all', verbose=False): |
|
78
|
|
|
self.pipeline = pipeline |
|
79
|
|
|
if verbose: |
|
80
|
|
|
print(self._pipeline) |
|
81
|
|
|
self.pipe_through_processors(category, num_docs=sample) |
|
82
|
|
|
|
|
83
|
|
|
def persist(self, dataset_path, labels_hash, class_names, add_class_labels_to_vocab=True): |
|
84
|
|
|
self._prepare_storing(dataset_path) |
|
85
|
|
|
self._labels_hash = labels_hash |
|
86
|
|
|
self.pipe_through_disk_writers() |
|
87
|
|
|
self.class_names = class_names |
|
88
|
|
|
self.write_vocab(dataset_path, add_class_labels=add_class_labels_to_vocab) |
|
89
|
|
|
return self.create_dataset(dataset_path) |
|
90
|
|
|
|
|
91
|
|
|
def preprocess(self, category, pipeline, collection_path, labels_hash, class_names, sample='all', add_class_labels_to_vocab=True): |
|
92
|
|
|
self.process(pipeline, category, sample=sample) |
|
93
|
|
|
return self.persist(collection_path, labels_hash, class_names, add_class_labels_to_vocab=add_class_labels_to_vocab) |
|
94
|
|
|
|
|
95
|
|
|
def _prepare_storing(self, dataset_path): |
|
96
|
|
|
self._collection = os.path.basename(dataset_path) |
|
97
|
|
|
self.uci_file = os.path.join(dataset_path, 'docword.{}.txt'.format(self._collection)) |
|
98
|
|
|
self.vowpal_file = os.path.join(dataset_path, 'vowpal.{}.txt'.format(self._collection)) |
|
99
|
|
|
self.pipeline.initialize(file_paths=[self.uci_file, self.vowpal_file]) |
|
100
|
|
|
|
|
101
|
|
|
##### |
|
102
|
|
|
def pipe_through_processors(self, category, num_docs='all'): |
|
103
|
|
|
doc_gens = [] |
|
104
|
|
|
self.outlet_ids = [] |
|
105
|
|
|
self.doc_gen_stats['corpus-tokens'] = 0 |
|
106
|
|
|
self.cat2textgen_proc = CategoryToFieldsGenerator(('text', 'poster_id'), nb_docs=num_docs) |
|
107
|
|
|
self.text_generator = self.cat2textgen_proc.process(category) |
|
108
|
|
|
print(self.cat2textgen_proc, '\n') |
|
109
|
|
|
for i, doc in enumerate(self.text_generator): |
|
110
|
|
|
doc_gens.append(self._pipeline.pipe_through(doc['text'], len(self._pipeline) - 2)) |
|
111
|
|
|
self.outlet_ids.append(str(doc['poster_id'])) # index outlets (document authors) ids |
|
112
|
|
|
|
|
113
|
|
|
self.dct = self._pipeline[self._pipeline.processors_names.index('dict-builder')][1].state |
|
114
|
|
|
# self.corpus = [self.dct.doc2bow([token for token in tok_gen]) for tok_gen in doc_gens] |
|
115
|
|
|
# print '{} tokens in all generators\n'.format(sum_toks) |
|
116
|
|
|
# print 'total bow tuples in corpus: {}'.format(sum(len(_) for _ in self.corpus)) |
|
117
|
|
|
# print "GENSIM-DICT:\nnum_pos (processes words): {}\nnum_nnz (nb of bow-tuples) {}\nvocab size: {}".format(self.dct.num_pos, self.dct.num_nnz, len(self.dct.items())) |
|
118
|
|
|
# print '\nnum_pos', self.dct.num_pos, '\nnum_nnz', self.dct.num_nnz, '\n{} items in dictionary'.format(len(self.dct.items())) |
|
119
|
|
|
print |
|
120
|
|
|
self._print_dict_stats() |
|
121
|
|
|
print("SAMPLE LEXICAL ITEMS:\n{}".format( |
|
122
|
|
|
'\n'.join(map(lambda x: '{}: {}'.format(x[0], x[1]), sorted(self.dct.iteritems(), key=itemgetter(0))[:5])))) |
|
123
|
|
|
|
|
124
|
|
|
tokens = [[token for token in tok_gen] for tok_gen in doc_gens] |
|
125
|
|
|
|
|
126
|
|
|
# print corpus stats before applying 'below' and 'above' filtering |
|
127
|
|
|
c = [self.dct.doc2bow(doc_tokens) for doc_tokens in tokens] |
|
128
|
|
|
self._print_bow_model_stats(c) |
|
129
|
|
|
|
|
130
|
|
|
print(' -- filter extremes -- ') |
|
131
|
|
|
self.dct.filter_extremes(no_below=self._pipeline.settings['nobelow'], |
|
132
|
|
|
no_above=self._pipeline.settings['noabove']) |
|
133
|
|
|
self._print_dict_stats() |
|
134
|
|
|
|
|
135
|
|
|
print(' -- compactify -- ') |
|
136
|
|
|
self.dct.compactify() |
|
137
|
|
|
self._print_dict_stats() |
|
138
|
|
|
|
|
139
|
|
|
# self.corpus = filter(None, [self.dct.doc2bow([token for token in tok_gen]) for tok_gen in doc_gens]) |
|
140
|
|
|
self.corpus = [self.dct.doc2bow(doc_tokens) for doc_tokens in tokens] |
|
141
|
|
|
self._print_bow_model_stats(self.corpus) |
|
142
|
|
|
|
|
143
|
|
|
# REMOVE EMPTY DOCS |
|
144
|
|
|
c = [_ for _ in self.corpus if _] |
|
145
|
|
|
self.corpus, self.outlet_ids = (list(x) for x in zip(*[[doc, label] for doc, label in zip(self.corpus, self.outlet_ids) if doc])) |
|
|
|
|
|
|
146
|
|
|
assert len(c) == len(self.corpus) == len(self.outlet_ids) |
|
147
|
|
|
self._print_bow_model_stats(self.corpus) |
|
148
|
|
|
print |
|
149
|
|
|
|
|
150
|
|
|
def pipe_through_disk_writers(self): |
|
151
|
|
|
"""Call to pass through the last BaseDiskWriter processors of the pieline. Assumes the last non BaseDsikWriter processor in the pipeline is a 'weight' so that a 'counts 'or 'tfidf' token weight model is computed""" |
|
152
|
|
|
if len(self.corpus) != len(self.outlet_ids): |
|
153
|
|
|
logger.warning("Please fix the logic because there is a missmatch between documents and labels: {} != {}".format(len(self.corpus), len(self.outlet_ids))) |
|
|
|
|
|
|
154
|
|
|
for _, processor in self.pipeline.disk_writers: |
|
155
|
|
|
for i, vector in enumerate(self._get_iterable_data_model(self.pipeline.settings['weight'])): # 'counts' only supported (future work: 'tfidf') |
|
156
|
|
|
processor.process(self._format_data_tr[processor.to_id()]((i, vector))) |
|
157
|
|
|
self.doc_gen_stats.update({'docs-gen': self.cat2textgen_proc.nb_processed, 'docs-failed': len(self.cat2textgen_proc.failed)}) |
|
158
|
|
|
|
|
159
|
|
|
# the first 3 lines of a uci formatted file: correspond to nb_docs, vocab_size, sum of nb of tuples (representing the bow model) found in all documents. |
|
160
|
|
|
# They should be written on the top |
|
161
|
|
|
prologue_lines = map(lambda x: str(x), [self.dct.num_docs, len(self.dct.items()), sum(len(_) for _ in self.corpus)]) |
|
162
|
|
|
self.pipeline.finalize([prologue_lines]) |
|
163
|
|
|
|
|
164
|
|
|
def _get_iterable_data_model(self, data_model): |
|
165
|
|
|
if data_model not in self._data_models: |
|
166
|
|
|
self._data_models[data_model] = self._data_model2constructor[data_model](self.corpus) |
|
167
|
|
|
return self._vec_gen[data_model](self.corpus) |
|
168
|
|
|
|
|
169
|
|
|
####### |
|
170
|
|
|
def write_vocab(self, dataset_path, add_class_labels=True): |
|
171
|
|
|
# Define file and dump the vocabulary (list of unique tokens, one per line) |
|
172
|
|
|
self.vocab_file = os.path.join(dataset_path, 'vocab.{}.txt'.format(os.path.basename(dataset_path))) |
|
173
|
|
|
if not os.path.isfile(self.vocab_file): |
|
174
|
|
|
with open(self.vocab_file, 'w') as f: |
|
175
|
|
|
for string_id, string in self._vocab_tokens_generator(include_class_labels=add_class_labels): |
|
176
|
|
|
try: |
|
177
|
|
|
# f.write('{}\n'.format(string.encode('utf-8'))) |
|
178
|
|
|
f.write('{}\n'.format(string)) |
|
179
|
|
|
except UnicodeEncodeError as e: |
|
180
|
|
|
# f.write('\n'.join(map(lambda x: '{}'.format(str(x[1])), sorted([_ for _ in self.dct.iteritems()], key=itemgetter(0))))) |
|
181
|
|
|
print('FAILED', type(string_id), string) |
|
182
|
|
|
raise e |
|
183
|
|
|
print("Created '{}' file".format(self.vocab_file)) |
|
184
|
|
|
else: |
|
185
|
|
|
print("File '{}' already exists. Skipping.".format(self.vocab_file)) |
|
186
|
|
|
|
|
187
|
|
|
def _vocab_tokens_generator(self, include_class_labels=True): |
|
188
|
|
|
for gram_id, gram_string in self.dct.iteritems(): |
|
189
|
|
|
yield gram_id, gram_string |
|
190
|
|
|
if include_class_labels: |
|
191
|
|
|
for class_label in [_ for _ in self.class_names if _ in set(self.labels)]: |
|
192
|
|
|
yield 'class_modality', '{} {}'.format(class_label, IDEOLOGY_CLASS_NAME) |
|
193
|
|
|
|
|
194
|
|
|
####### |
|
195
|
|
|
def create_dataset(self, dataset_path): |
|
196
|
|
|
dataset = TextDataset(os.path.basename(dataset_path), self._get_dataset_id(), |
|
197
|
|
|
len(self.corpus), len(self.dct.items()), sum(len(_) for _ in self.corpus), |
|
198
|
|
|
self.uci_file, self.vocab_file, self.vowpal_file) |
|
199
|
|
|
dataset.root_dir = dataset_path |
|
200
|
|
|
dataset.save() |
|
201
|
|
|
return dataset |
|
202
|
|
|
|
|
203
|
|
|
def _get_dataset_id(self): |
|
204
|
|
|
idd = self._pipeline.get_id() # get_id(self._pipeline.settings) |
|
205
|
|
|
ri = idd.rfind('_') |
|
206
|
|
|
return str(len(self.corpus)) + '_' + idd[:ri] + '.' + idd[ri + 1:] |
|
207
|
|
|
|
|
208
|
|
|
###### UTILS |
|
209
|
|
|
def _print_dict_stats(self): |
|
210
|
|
|
print("GENSIM-DICT:\nnum_pos (processes words): {}\nnum_nnz (nb of bow-tuples) {}\nvocab size: {}".format( |
|
211
|
|
|
self.dct.num_pos, self.dct.num_nnz, len(self.dct.items()))) |
|
212
|
|
|
|
|
213
|
|
|
@classmethod |
|
214
|
|
|
def _print_bow_model_stats(cls, bow_corpus): |
|
215
|
|
|
print("BOW-MODEL:\nnumber of word position (num_pos): {}\ntotal number of tuples (num_nnz): {}\n number of docs: {}\nempty docs: {}".format( |
|
216
|
|
|
sum(sum(bow_tuple[1] for bow_tuple in doc) for doc in bow_corpus), sum(len(_) for _ in bow_corpus), len(bow_corpus), len([_ for _ in bow_corpus if not _]))) |
|
217
|
|
|
|