patm.pipe_handler.PipeHandler.persist()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 7
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 7
nop 5
dl 0
loc 7
rs 10
c 0
b 0
f 0
1
import os
2
import re
3
import sys
4
import argparse
5
from operator import itemgetter
6
from collections import OrderedDict
7
import pandas as pd
8
from configparser import ConfigParser
9
from gensim.corpora import Dictionary
10
from gensim.models.tfidfmodel import TfidfModel
11
12
from .modeling.dataset_extraction import CategoryToFieldsGenerator
13
from .dataset import TextDataset
14
from topic_modeling_toolkit.processors import Pipeline
15
16
from .definitions import IDEOLOGY_CLASS_NAME, COOCURENCE_DICT_FILE_NAMES# = ['cooc_tf_', 'cooc_df_', 'ppmi_tf_', 'ppmi_df_']
17
18
# import logging
19
# logger = logging.getLogger(__name__)
20
21
22
class PipeHandler(object):
23
    def __init__(self):
24
        self.cat2textgen_proc = None
25
        self.text_generator = None
26
        self.doc_gen_stats = {}
27
        self.dct = None
28
        self.corpus = None
29
        self.nb_docs = 0
30
        self._pipeline = None
31
        self.dataset = None
32
        self._collection = ''
33
        self.vocab_file = ''
34
        self.uci_file = ''
35
        self.vowpal_file = ''
36
        self.outlet_ids = []
37
        self._pack_data = None
38
        self._data_models = {}
39
        self._data_model2constructor = {'counts': lambda x: x,
40
                                       'tfidf': TfidfModel}
41
        self._vec_gen = {'counts': lambda bow_model: (_ for _ in bow_model),
42
                         'tfidf': lambda bow_model: (_ for _ in map(lambda x: self._data_models['tfidf'][x], bow_model))}
43
        self._format_data_tr = {
44
            'uci': lambda x: x[1],
45
            'vowpal': lambda x: [map(lambda y: (self.dct[y[0]], y[1]), x[1]), {IDEOLOGY_CLASS_NAME: self.label(self.outlet_ids[x[0]])}]
46
        }
47
        self._labels_hash = {}
48
49
    @property
50
    def labels_hash(self):
51
        return self._labels_hash
52
53
    @labels_hash.setter
54
    def labels_hash(self, outlet_id2document_label_hash):
55
        self._labels_hash = outlet_id2document_label_hash
56
57
    def label(self, outlet_id):
58
        return self._labels_hash[outlet_id]
59
60
    @property
61
    def labels(self):
62
        return [self.label(x) for x in self.outlet_ids]
63
64
    @property
65
    def pipeline(self):
66
        return self._pipeline
67
    @pipeline.setter
68
    def pipeline(self, pipeline):
69
        """Set the processing pipeline for the handler to use.\n
70
        :param str or processors.pipeline.Pipeline pipeline:
71
        """
72
        if type(pipeline) == str:
73
            self._pipeline = Pipeline.from_cfg(pipeline)
74
        else:
75
            self._pipeline = pipeline
76
77
    def process(self, pipeline, category, sample='all', verbose=False):
78
        self.pipeline = pipeline
79
        if verbose:
80
            print(self._pipeline)
81
        self.pipe_through_processors(category, num_docs=sample)
82
83
    def persist(self, dataset_path, labels_hash, class_names, add_class_labels_to_vocab=True):
84
        self._prepare_storing(dataset_path)
85
        self._labels_hash = labels_hash
86
        self.pipe_through_disk_writers()
87
        self.class_names = class_names
88
        self.write_vocab(dataset_path, add_class_labels=add_class_labels_to_vocab)
89
        return self.create_dataset(dataset_path)
90
91
    def preprocess(self, category, pipeline, collection_path, labels_hash, class_names, sample='all', add_class_labels_to_vocab=True):
92
        self.process(pipeline, category, sample=sample)
93
        return self.persist(collection_path, labels_hash, class_names, add_class_labels_to_vocab=add_class_labels_to_vocab)
94
95
    def _prepare_storing(self, dataset_path):
96
        self._collection = os.path.basename(dataset_path)
97
        self.uci_file = os.path.join(dataset_path, 'docword.{}.txt'.format(self._collection))
98
        self.vowpal_file = os.path.join(dataset_path, 'vowpal.{}.txt'.format(self._collection))
99
        self.pipeline.initialize(file_paths=[self.uci_file, self.vowpal_file])
100
101
    #####
102
    def pipe_through_processors(self, category, num_docs='all'):
103
        doc_gens = []
104
        self.outlet_ids = []
105
        self.doc_gen_stats['corpus-tokens'] = 0
106
        self.cat2textgen_proc = CategoryToFieldsGenerator(('text', 'poster_id'), nb_docs=num_docs)
107
        self.text_generator = self.cat2textgen_proc.process(category)
108
        print(self.cat2textgen_proc, '\n')
109
        for i, doc in enumerate(self.text_generator):
110
            doc_gens.append(self._pipeline.pipe_through(doc['text'], len(self._pipeline) - 2))
111
            self.outlet_ids.append(str(doc['poster_id']))  # index outlets (document authors) ids
112
113
        self.dct = self._pipeline[self._pipeline.processors_names.index('dict-builder')][1].state
114
        # self.corpus = [self.dct.doc2bow([token for token in tok_gen]) for tok_gen in doc_gens]
115
        # print '{} tokens in all generators\n'.format(sum_toks)
116
        # print 'total bow tuples in corpus: {}'.format(sum(len(_) for _ in self.corpus))
117
        # print "GENSIM-DICT:\nnum_pos (processes words): {}\nnum_nnz (nb of bow-tuples) {}\nvocab size: {}".format(self.dct.num_pos, self.dct.num_nnz, len(self.dct.items()))
118
        # print '\nnum_pos', self.dct.num_pos, '\nnum_nnz', self.dct.num_nnz, '\n{} items in dictionary'.format(len(self.dct.items()))
119
        print
120
        self._print_dict_stats()
121
        print("SAMPLE LEXICAL ITEMS:\n{}".format(
122
            '\n'.join(map(lambda x: '{}: {}'.format(x[0], x[1]), sorted(self.dct.iteritems(), key=itemgetter(0))[:5]))))
123
124
        tokens = [[token for token in tok_gen] for tok_gen in doc_gens]
125
126
        # print corpus stats before applying 'below' and 'above' filtering
127
        c = [self.dct.doc2bow(doc_tokens) for doc_tokens in tokens]
128
        self._print_bow_model_stats(c)
129
130
        print(' -- filter extremes -- ')
131
        self.dct.filter_extremes(no_below=self._pipeline.settings['nobelow'],
132
                                 no_above=self._pipeline.settings['noabove'])
133
        self._print_dict_stats()
134
135
        print(' -- compactify -- ')
136
        self.dct.compactify()
137
        self._print_dict_stats()
138
139
        # self.corpus = filter(None, [self.dct.doc2bow([token for token in tok_gen]) for tok_gen in doc_gens])
140
        self.corpus = [self.dct.doc2bow(doc_tokens) for doc_tokens in tokens]
141
        self._print_bow_model_stats(self.corpus)
142
143
        # REMOVE EMPTY DOCS
144
        c = [_ for _ in self.corpus if _]
145
        self.corpus, self.outlet_ids = (list(x) for x in zip(*[[doc, label] for doc, label in zip(self.corpus, self.outlet_ids) if doc]))
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable x does not seem to be defined.
Loading history...
146
        assert len(c) == len(self.corpus) == len(self.outlet_ids)
147
        self._print_bow_model_stats(self.corpus)
148
        print
149
150
    def pipe_through_disk_writers(self):
151
        """Call to pass through the last BaseDiskWriter processors of the pieline. Assumes the last non BaseDsikWriter processor in the pipeline is a 'weight' so that a 'counts 'or 'tfidf' token weight model is computed"""
152
        if len(self.corpus) != len(self.outlet_ids):
153
            logger.warning("Please fix the logic because there is a missmatch between documents and labels: {} != {}".format(len(self.corpus), len(self.outlet_ids)))
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable logger does not seem to be defined.
Loading history...
154
        for _, processor in self.pipeline.disk_writers:
155
            for i, vector in enumerate(self._get_iterable_data_model(self.pipeline.settings['weight'])):  # 'counts' only supported (future work: 'tfidf')
156
                processor.process(self._format_data_tr[processor.to_id()]((i, vector)))
157
        self.doc_gen_stats.update({'docs-gen': self.cat2textgen_proc.nb_processed, 'docs-failed': len(self.cat2textgen_proc.failed)})
158
159
        # the first 3 lines of a uci formatted file: correspond to nb_docs, vocab_size, sum of nb of tuples (representing the bow model) found in all documents.
160
        # They should be written on the top
161
        prologue_lines = map(lambda x: str(x), [self.dct.num_docs, len(self.dct.items()), sum(len(_) for _ in self.corpus)])
162
        self.pipeline.finalize([prologue_lines])
163
164
    def _get_iterable_data_model(self, data_model):
165
        if data_model not in self._data_models:
166
            self._data_models[data_model] = self._data_model2constructor[data_model](self.corpus)
167
        return self._vec_gen[data_model](self.corpus)
168
169
    #######
170
    def write_vocab(self, dataset_path, add_class_labels=True):
171
        # Define file and dump the vocabulary (list of unique tokens, one per line)
172
        self.vocab_file = os.path.join(dataset_path, 'vocab.{}.txt'.format(os.path.basename(dataset_path)))
173
        if not os.path.isfile(self.vocab_file):
174
            with open(self.vocab_file, 'w') as f:
175
                for string_id, string in self._vocab_tokens_generator(include_class_labels=add_class_labels):
176
                    try:
177
                        # f.write('{}\n'.format(string.encode('utf-8')))
178
                        f.write('{}\n'.format(string))
179
                    except UnicodeEncodeError as e:
180
                        # f.write('\n'.join(map(lambda x: '{}'.format(str(x[1])), sorted([_ for _ in self.dct.iteritems()], key=itemgetter(0)))))
181
                        print('FAILED', type(string_id), string)
182
                        raise e
183
                print("Created '{}' file".format(self.vocab_file))
184
        else:
185
            print("File '{}' already exists. Skipping.".format(self.vocab_file))
186
187
    def _vocab_tokens_generator(self, include_class_labels=True):
188
        for gram_id, gram_string in self.dct.iteritems():
189
            yield gram_id, gram_string
190
        if include_class_labels:
191
            for class_label in [_ for _ in self.class_names if _ in set(self.labels)]:
192
                yield 'class_modality', '{} {}'.format(class_label, IDEOLOGY_CLASS_NAME)
193
194
    #######
195
    def create_dataset(self, dataset_path):
196
        dataset = TextDataset(os.path.basename(dataset_path), self._get_dataset_id(),
197
                                   len(self.corpus), len(self.dct.items()), sum(len(_) for _ in self.corpus),
198
                                   self.uci_file, self.vocab_file, self.vowpal_file)
199
        dataset.root_dir = dataset_path
200
        dataset.save()
201
        return dataset
202
203
    def _get_dataset_id(self):
204
        idd = self._pipeline.get_id()  # get_id(self._pipeline.settings)
205
        ri = idd.rfind('_')
206
        return str(len(self.corpus)) + '_' + idd[:ri] + '.' + idd[ri + 1:]
207
208
    ###### UTILS
209
    def _print_dict_stats(self):
210
        print("GENSIM-DICT:\nnum_pos (processes words): {}\nnum_nnz (nb of bow-tuples) {}\nvocab size: {}".format(
211
            self.dct.num_pos, self.dct.num_nnz, len(self.dct.items())))
212
213
    @classmethod
214
    def _print_bow_model_stats(cls, bow_corpus):
215
        print("BOW-MODEL:\nnumber of word position (num_pos): {}\ntotal number of tuples (num_nnz): {}\n number of docs: {}\nempty docs: {}".format(
216
            sum(sum(bow_tuple[1] for bow_tuple in doc) for doc in bow_corpus), sum(len(_) for _ in bow_corpus), len(bow_corpus), len([_ for _ in bow_corpus if not _])))
217