patm.pipe_handler.PipeHandler.preprocess()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 3
nop 8
dl 0
loc 3
rs 10
c 0
b 0
f 0

How to fix   Many Parameters   

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
import os
2
import re
3
import sys
4
import argparse
5
from operator import itemgetter
6
from collections import OrderedDict
7
import pandas as pd
8
from configparser import ConfigParser
9
from gensim.corpora import Dictionary
10
from gensim.models.tfidfmodel import TfidfModel
11
12
from .modeling.dataset_extraction import CategoryToFieldsGenerator
13
from .dataset import TextDataset
14
from topic_modeling_toolkit.processors import Pipeline
15
16
from .definitions import IDEOLOGY_CLASS_NAME, COOCURENCE_DICT_FILE_NAMES# = ['cooc_tf_', 'cooc_df_', 'ppmi_tf_', 'ppmi_df_']
17
18
# import logging
19
# logger = logging.getLogger(__name__)
20
21
22
class PipeHandler(object):
23
    def __init__(self):
24
        self.cat2textgen_proc = None
25
        self.text_generator = None
26
        self.doc_gen_stats = {}
27
        self.dct = None
28
        self.corpus = None
29
        self.nb_docs = 0
30
        self._pipeline = None
31
        self.dataset = None
32
        self._collection = ''
33
        self.vocab_file = ''
34
        self.uci_file = ''
35
        self.vowpal_file = ''
36
        self.outlet_ids = []
37
        self._pack_data = None
38
        self._data_models = {}
39
        self._data_model2constructor = {'counts': lambda x: x,
40
                                       'tfidf': TfidfModel}
41
        self._vec_gen = {'counts': lambda bow_model: (_ for _ in bow_model),
42
                         'tfidf': lambda bow_model: (_ for _ in map(lambda x: self._data_models['tfidf'][x], bow_model))}
43
        self._format_data_tr = {
44
            'uci': lambda x: x[1],
45
            'vowpal': lambda x: [map(lambda y: (self.dct[y[0]], y[1]), x[1]), {IDEOLOGY_CLASS_NAME: self.label(self.outlet_ids[x[0]])}]
46
        }
47
        self._labels_hash = {}
48
49
    @property
50
    def labels_hash(self):
51
        return self._labels_hash
52
53
    @labels_hash.setter
54
    def labels_hash(self, outlet_id2document_label_hash):
55
        self._labels_hash = outlet_id2document_label_hash
56
57
    def label(self, outlet_id):
58
        return self._labels_hash[outlet_id]
59
60
    @property
61
    def labels(self):
62
        return [self.label(x) for x in self.outlet_ids]
63
64
    @property
65
    def pipeline(self):
66
        return self._pipeline
67
    @pipeline.setter
68
    def pipeline(self, pipeline):
69
        """Set the processing pipeline for the handler to use.\n
70
        :param str or processors.pipeline.Pipeline pipeline:
71
        """
72
        if type(pipeline) == str:
73
            self._pipeline = Pipeline.from_cfg(pipeline)
74
        else:
75
            self._pipeline = pipeline
76
77
    def process(self, pipeline, category, sample='all', verbose=False):
78
        self.pipeline = pipeline
79
        if verbose:
80
            print(self._pipeline)
81
        self.pipe_through_processors(category, num_docs=sample)
82
83
    def persist(self, dataset_path, labels_hash, class_names, add_class_labels_to_vocab=True):
84
        self._prepare_storing(dataset_path)
85
        self._labels_hash = labels_hash
86
        self.pipe_through_disk_writers()
87
        self.class_names = class_names
88
        self.write_vocab(dataset_path, add_class_labels=add_class_labels_to_vocab)
89
        return self.create_dataset(dataset_path)
90
91
    def preprocess(self, category, pipeline, collection_path, labels_hash, class_names, sample='all', add_class_labels_to_vocab=True):
92
        self.process(pipeline, category, sample=sample)
93
        return self.persist(collection_path, labels_hash, class_names, add_class_labels_to_vocab=add_class_labels_to_vocab)
94
95
    def _prepare_storing(self, dataset_path):
96
        self._collection = os.path.basename(dataset_path)
97
        self.uci_file = os.path.join(dataset_path, 'docword.{}.txt'.format(self._collection))
98
        self.vowpal_file = os.path.join(dataset_path, 'vowpal.{}.txt'.format(self._collection))
99
        self.pipeline.initialize(file_paths=[self.uci_file, self.vowpal_file])
100
101
    #####
102
    def pipe_through_processors(self, category, num_docs='all'):
103
        doc_gens = []
104
        self.outlet_ids = []
105
        self.doc_gen_stats['corpus-tokens'] = 0
106
        self.cat2textgen_proc = CategoryToFieldsGenerator(('text', 'poster_id'), nb_docs=num_docs)
107
        self.text_generator = self.cat2textgen_proc.process(category)
108
        print(self.cat2textgen_proc, '\n')
109
        for i, doc in enumerate(self.text_generator):
110
            doc_gens.append(self._pipeline.pipe_through(doc['text'], len(self._pipeline) - 2))
111
            self.outlet_ids.append(str(doc['poster_id']))  # index outlets (document authors) ids
112
113
        self.dct = self._pipeline[self._pipeline.processors_names.index('dict-builder')][1].state
114
        # self.corpus = [self.dct.doc2bow([token for token in tok_gen]) for tok_gen in doc_gens]
115
        # print '{} tokens in all generators\n'.format(sum_toks)
116
        # print 'total bow tuples in corpus: {}'.format(sum(len(_) for _ in self.corpus))
117
        # print "GENSIM-DICT:\nnum_pos (processes words): {}\nnum_nnz (nb of bow-tuples) {}\nvocab size: {}".format(self.dct.num_pos, self.dct.num_nnz, len(self.dct.items()))
118
        # print '\nnum_pos', self.dct.num_pos, '\nnum_nnz', self.dct.num_nnz, '\n{} items in dictionary'.format(len(self.dct.items()))
119
        print
120
        self._print_dict_stats()
121
        print("SAMPLE LEXICAL ITEMS:\n{}".format(
122
            '\n'.join(map(lambda x: '{}: {}'.format(x[0], x[1]), sorted(self.dct.iteritems(), key=itemgetter(0))[:5]))))
123
124
        tokens = [[token for token in tok_gen] for tok_gen in doc_gens]
125
126
        # print corpus stats before applying 'below' and 'above' filtering
127
        c = [self.dct.doc2bow(doc_tokens) for doc_tokens in tokens]
128
        self._print_bow_model_stats(c)
129
130
        print(' -- filter extremes -- ')
131
        self.dct.filter_extremes(no_below=self._pipeline.settings['nobelow'],
132
                                 no_above=self._pipeline.settings['noabove'])
133
        self._print_dict_stats()
134
135
        print(' -- compactify -- ')
136
        self.dct.compactify()
137
        self._print_dict_stats()
138
139
        # self.corpus = filter(None, [self.dct.doc2bow([token for token in tok_gen]) for tok_gen in doc_gens])
140
        self.corpus = [self.dct.doc2bow(doc_tokens) for doc_tokens in tokens]
141
        self._print_bow_model_stats(self.corpus)
142
143
        # REMOVE EMPTY DOCS
144
        c = [_ for _ in self.corpus if _]
145
        self.corpus, self.outlet_ids = (list(x) for x in zip(*[[doc, label] for doc, label in zip(self.corpus, self.outlet_ids) if doc]))
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable x does not seem to be defined.
Loading history...
146
        assert len(c) == len(self.corpus) == len(self.outlet_ids)
147
        self._print_bow_model_stats(self.corpus)
148
        print
149
150
    def pipe_through_disk_writers(self):
151
        """Call to pass through the last BaseDiskWriter processors of the pieline. Assumes the last non BaseDsikWriter processor in the pipeline is a 'weight' so that a 'counts 'or 'tfidf' token weight model is computed"""
152
        if len(self.corpus) != len(self.outlet_ids):
153
            logger.warning("Please fix the logic because there is a missmatch between documents and labels: {} != {}".format(len(self.corpus), len(self.outlet_ids)))
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable logger does not seem to be defined.
Loading history...
154
        for _, processor in self.pipeline.disk_writers:
155
            for i, vector in enumerate(self._get_iterable_data_model(self.pipeline.settings['weight'])):  # 'counts' only supported (future work: 'tfidf')
156
                processor.process(self._format_data_tr[processor.to_id()]((i, vector)))
157
        self.doc_gen_stats.update({'docs-gen': self.cat2textgen_proc.nb_processed, 'docs-failed': len(self.cat2textgen_proc.failed)})
158
159
        # the first 3 lines of a uci formatted file: correspond to nb_docs, vocab_size, sum of nb of tuples (representing the bow model) found in all documents.
160
        # They should be written on the top
161
        prologue_lines = map(lambda x: str(x), [self.dct.num_docs, len(self.dct.items()), sum(len(_) for _ in self.corpus)])
162
        self.pipeline.finalize([prologue_lines])
163
164
    def _get_iterable_data_model(self, data_model):
165
        if data_model not in self._data_models:
166
            self._data_models[data_model] = self._data_model2constructor[data_model](self.corpus)
167
        return self._vec_gen[data_model](self.corpus)
168
169
    #######
170
    def write_vocab(self, dataset_path, add_class_labels=True):
171
        # Define file and dump the vocabulary (list of unique tokens, one per line)
172
        self.vocab_file = os.path.join(dataset_path, 'vocab.{}.txt'.format(os.path.basename(dataset_path)))
173
        if not os.path.isfile(self.vocab_file):
174
            with open(self.vocab_file, 'w') as f:
175
                for string_id, string in self._vocab_tokens_generator(include_class_labels=add_class_labels):
176
                    try:
177
                        # f.write('{}\n'.format(string.encode('utf-8')))
178
                        f.write('{}\n'.format(string))
179
                    except UnicodeEncodeError as e:
180
                        # f.write('\n'.join(map(lambda x: '{}'.format(str(x[1])), sorted([_ for _ in self.dct.iteritems()], key=itemgetter(0)))))
181
                        print('FAILED', type(string_id), string)
182
                        raise e
183
                print("Created '{}' file".format(self.vocab_file))
184
        else:
185
            print("File '{}' already exists. Skipping.".format(self.vocab_file))
186
187
    def _vocab_tokens_generator(self, include_class_labels=True):
188
        for gram_id, gram_string in self.dct.iteritems():
189
            yield gram_id, gram_string
190
        if include_class_labels:
191
            for class_label in [_ for _ in self.class_names if _ in set(self.labels)]:
192
                yield 'class_modality', '{} {}'.format(class_label, IDEOLOGY_CLASS_NAME)
193
194
    #######
195
    def create_dataset(self, dataset_path):
196
        dataset = TextDataset(os.path.basename(dataset_path), self._get_dataset_id(),
197
                                   len(self.corpus), len(self.dct.items()), sum(len(_) for _ in self.corpus),
198
                                   self.uci_file, self.vocab_file, self.vowpal_file)
199
        dataset.root_dir = dataset_path
200
        dataset.save()
201
        return dataset
202
203
    def _get_dataset_id(self):
204
        idd = self._pipeline.get_id()  # get_id(self._pipeline.settings)
205
        ri = idd.rfind('_')
206
        return str(len(self.corpus)) + '_' + idd[:ri] + '.' + idd[ri + 1:]
207
208
    ###### UTILS
209
    def _print_dict_stats(self):
210
        print("GENSIM-DICT:\nnum_pos (processes words): {}\nnum_nnz (nb of bow-tuples) {}\nvocab size: {}".format(
211
            self.dct.num_pos, self.dct.num_nnz, len(self.dct.items())))
212
213
    @classmethod
214
    def _print_bow_model_stats(cls, bow_corpus):
215
        print("BOW-MODEL:\nnumber of word position (num_pos): {}\ntotal number of tuples (num_nnz): {}\n number of docs: {}\nempty docs: {}".format(
216
            sum(sum(bow_tuple[1] for bow_tuple in doc) for doc in bow_corpus), sum(len(_) for _ in bow_corpus), len(bow_corpus), len([_ for _ in bow_corpus if not _])))
217