GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — master ( b17b99...6546a6 )
by Anca
20:31
created

crowdtruth.load   F

Complexity

Total Complexity 64

Size/Duplication

Total Lines 320
Duplicated Lines 0 %

Test Coverage

Coverage 85.53%

Importance

Changes 0
Metric Value
wmc 64
eloc 191
dl 0
loc 320
ccs 136
cts 159
cp 0.8553
rs 3.28
c 0
b 0
f 0

11 Functions

Rating   Name   Duplication   Size   Complexity  
A validate_timestamp_field() 0 8 2
A create_ordered_counter() 0 6 3
A load() 0 36 3
B list_files() 0 22 8
B get_file_list() 0 17 6
B add_missing_values() 0 12 6
D remove_empty_rows() 0 32 12
A remove_single_judgment_units() 0 14 4
A aggregate_annotations() 0 10 5
B process_file() 0 102 6
C make_output_cols_safe_keys() 0 20 9

How to fix   Complexity   

Complexity

Complex classes like crowdtruth.load often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#pylint: disable=W0223
2
3 1
"""
4
Module used to process and load the input files to be evaluated with the CrowdTruth metrics.
5
"""
6
7 1
import os
8
9 1
import logging
10 1
import datetime
11 1
from collections import Counter, OrderedDict
12 1
import dateparser
13
14 1
import pandas as pd
15
16 1
from crowdtruth.models.worker import Worker
17 1
from crowdtruth.models.unit import Unit
18 1
from crowdtruth.models.job import Job
19 1
from crowdtruth.configuration import DefaultConfig
20 1
from crowdtruth.crowd_platform import *
21
22
23
24
25
# create an ordered counter so that we can maintain
26
# the position of tags in the order they were annotated
27 1
class OrderedCounter(Counter, OrderedDict):
28
    """ Instantiates an ordered counter. """
29 1
    pass
30
31 1
def create_ordered_counter(ordered_counter, annotation_vector):
32
    """ Instantiates an ordered counter from a given annotation vector. """
33 1
    for relation in annotation_vector:
34 1
        if relation not in ordered_counter:
35 1
            ordered_counter.update({relation: 0})
36 1
    return ordered_counter
37
38 1
def validate_timestamp_field(date_string, date_format):
39
    """ Validates the time columns (started time and submitted time) in input files. """
40
41
    try:
42
        date_obj = datetime.datetime.strptime(date_string, date_format)
43
        print(date_obj)
44
    except ValueError:
45
        raise ValueError('Incorrect date format')
46
47 1
def get_file_list(directory):
48
    """ List the files in the directry given as argument. """
49 1
    filelist = []
50
51
    # go through all files in this folder
52 1
    for file in os.listdir(directory):
53
        # if it is a folder scan it
54 1
        if os.path.isdir(directory+'/'+file):
55
            sublist = get_file_list(directory+'/'+file)
56
            sublist_length = len(sublist)
57
            if sublist_length:
58
                filelist.append(sublist)
59
60
        # if it is a csv file open it
61 1
        if file.endswith('.csv') and file != 'groundtruth.csv':
62 1
            filelist.append(file)
63 1
    return filelist
64
65 1
def list_files(kwargs, results, config):
66
    """ Creates a list of files to be processed. """
67 1
    files = []
68 1
    directory = ""
69 1
    if 'file' in kwargs and kwargs['file'].endswith('.csv'):
70 1
        files = [kwargs['file']]
71 1
    elif 'directory' in kwargs:
72 1
        directory = kwargs['directory']
73 1
        files = get_file_list(directory)
74 1
        logging.info('Found ' + str(len(files)) + ' files')
75
    else:
76
        raise ValueError('No input was provided')
77
78 1
    for file in files:
79 1
        if 'directory' in locals() and directory != "":
80 1
            logging.info("Processing " + file)
81 1
            file = directory + "/" + file
82 1
        res, config = process_file(file, config)
83 1
        for value in res:
84 1
            results[value].append(res[value])
85
86 1
    return results
87
88 1
def load(**kwargs):
89
    """ Loads the input files. """
90
91
    # placeholder for aggregated results
92 1
    results = {
93
        'jobs' : [],
94
        'units' : [],
95
        'workers' : [],
96
        'judgments' : [],
97
        'annotations' : []
98
        }
99
100 1
    if 'config' not in kwargs:
101
        config = DefaultConfig()
102
    else:
103 1
        logging.info('Config loaded')
104 1
        config = kwargs['config']
105
106 1
    results = list_files(kwargs, results, config)
107
108 1
    for value in results:
109 1
        results[value] = pd.concat(results[value])
110
111
112
    # workers and annotations can appear across jobs, so we have to aggregate those extra
113 1
    results['workers'] = results['workers'].groupby(results['workers'].index).agg({
114
        'unit' : 'sum',
115
        'judgment' : 'sum',
116
        'job' : 'count',
117
        'duration' : 'mean'
118
        })
119
120
    # aggregate annotations
121 1
    results['annotations'] = results['annotations'].groupby(results['annotations'].index).sum()
122
123 1
    return results, config
124
125 1
def remove_empty_rows(config, judgments):
126
    """ handle rows where the worker did not give an answer (AMT issue) """
127
128
    # if config keeps empty rows, add NONE placehoder token
129 1
    if not config.remove_empty_rows:
130 1
        for col in config.outputColumns:
131 1
            for idx in range(len(judgments[col])):
132 1
                if (pd.isnull(judgments[col][idx]) or
133
                        judgments[col][idx] is None or
134
                        judgments[col][idx] == '' or
135
                        judgments[col][idx] == 'nan'):
136 1
                    logging.info('judgments[' + str(idx) + '][' + col + '] is None')
137 1
                    judgments.at[idx, col] = config.none_token
138
    # remove empty rows
139
    else:
140 1
        empty_rows = set()
141 1
        for col in config.outputColumns:
142 1
            empty_rows = empty_rows.union(judgments[pd.isnull(judgments[col]) == True].index)
143 1
            empty_rows = empty_rows.union(judgments[judgments[col] == 'nan'].index)
144 1
        for col in config.outputColumns:
145 1
            judgments = judgments[pd.isnull(judgments[col]) == False]
146 1
            judgments = judgments[judgments[col] != 'nan']
147 1
        judgments = judgments.reset_index(drop=True)
148 1
        count_empty_rows = len(empty_rows)
149 1
        if count_empty_rows > 0:
150 1
            if count_empty_rows == 1:
151
                logging.warning(str(count_empty_rows) + " row with incomplete information in "
152
                                "output columns was removed.")
153
            else:
154 1
                logging.warning(str(count_empty_rows) + " rows with incomplete information in "
155
                            "output columns were removed.")
156 1
    return judgments
157
158 1
def remove_single_judgment_units(judgments):
159
    """ remove units with just 1 judgment """
160 1
    units_1work = judgments.groupby('unit').filter(lambda x: len(x) == 1)["unit"]
161 1
    judgments = judgments[~judgments['unit'].isin(units_1work)]
162 1
    judgments = judgments.reset_index(drop=True)
163 1
    no_units_1work = len(units_1work)
164 1
    if no_units_1work > 0:
165
        if no_units_1work == 1:
166
            logging.warning(str(no_units_1work) + " Media Unit that was annotated by only"
167
                            " 1 Worker was omitted, since agreement cannot be calculated.")
168
        else:
169
            logging.warning(str(no_units_1work) + " Media Units that were annotated by only"
170
                            " 1 Worker were omitted, since agreement cannot be calculated.")
171 1
    return judgments
172
173 1
def make_output_cols_safe_keys(config, judgments):
174
    """ make output values safe keys """
175 1
    for col in config.output.values():
176 1
        if isinstance(judgments[col].iloc[0], dict):
177
            logging.info("Values stored as dictionary")
178
            if config.open_ended_task:
179
                judgments[col] = judgments[col].apply(lambda x: OrderedCounter(x))
180
            else:
181
                judgments[col] = judgments[col].apply(lambda x: create_ordered_counter( \
182
                                 OrderedCounter(x), config.annotation_vector))
183
        else:
184 1
            logging.info("Values not stored as dictionary")
185 1
            if config.open_ended_task:
186 1
                judgments[col] = judgments[col].apply(lambda x: OrderedCounter( \
187
                                                      x.split(config.annotation_separator)))
188
            else:
189 1
                judgments[col] = judgments[col].apply(lambda x: create_ordered_counter( \
190
                                 OrderedCounter(x.split(config.annotation_separator)), \
191
                                 config.annotation_vector))
192 1
    return judgments
193
194
195 1
def add_missing_values(config, units):
196
    """ Adds missing vector values if is a closed task """
197 1
    for col in config.output.values():
198 1
        try:
199
            # openended = config.open_ended_task
200 1
            for idx in list(units.index):
201 1
                for relation in config.annotation_vector:
202 1
                    if relation not in units[col][idx]:
203 1
                        units[col][idx].update({relation : 0})
204 1
            return units
205
        except AttributeError:
206
            continue
207 1
def aggregate_annotations(config, judgments):
208
    """ Aggregates annotations and adds judgments stats. """
209 1
    annotations = pd.DataFrame()
210 1
    for col in config.output.values():
211 1
        judgments[col+'.count'] = judgments[col].apply(lambda x: sum(x.values()))
212 1
        judgments[col+'.unique'] = judgments[col].apply(lambda x: len(x))
213 1
        res = pd.DataFrame(judgments[col].apply(lambda x: \
214
              pd.Series(list(x.keys())).value_counts()).sum(), columns=[col])
215 1
        annotations = pd.concat([annotations, res], axis=0)
216 1
    return annotations, judgments
217
218 1
def process_file(filename, config):
219
    """ Processes input files with the given configuration """
220
221 1
    judgments = pd.read_csv(filename)#, encoding=result['encoding'])
222
223 1
    platform = get_platform(judgments)
224
225 1
    if platform is False:
226 1
        logging.info("Custom crowdsourcing platform!")
227 1
        no_of_columns = len(config.customPlatformColumns)
228 1
        if no_of_columns != 5:
229
            logging.warning("The following column names are required: judgment id, "
230
                            "unit id, worker id, start time, submit time")
231
            raise ValueError('No custom platform configuration was provided')
232
        else:
233
234 1
            platform = {
235
                #'id'       : 'custom',
236
                config.customPlatformColumns[0] : 'judgment',
237
                config.customPlatformColumns[1] : 'unit',
238
                config.customPlatformColumns[2] : 'worker',
239
                config.customPlatformColumns[3] : 'started',
240
                config.customPlatformColumns[4] : 'submitted'
241
            }
242
243
244
    # we must establish which fields were part of the input data and which are output judgments
245
    # if there is a config, check if there is a definition of which fields to use
246
    #config = []
247
    # else use the default and select them automatically
248 1
    config = get_column_types(judgments, config)
249
250
    # allow customization of the judgments
251 1
    judgments = config.processJudgments(judgments)
252
253
    # handle empty rows
254 1
    judgments = remove_empty_rows(config, judgments)
255
256
    # update the config after the preprocessing of judgments
257 1
    config = get_column_types(judgments, config)
258
259 1
    all_columns = dict(list(config.input.items()) + list(config.output.items()) \
260
                       + list(platform.items()))
261
    # allColumns = dict(config.input.items() | config.output.items() | platform.items())
262 1
    judgments = judgments.rename(columns=all_columns)
263
264
    # remove columns we don't care about
265 1
    judgments = judgments[list(all_columns.values())]
266
267 1
    judgments['job'] = filename.split('.csv')[0]
268
269
    # make output values safe keys
270 1
    judgments = make_output_cols_safe_keys(config, judgments)
271
272
    # remove units with just 1 judgment
273 1
    judgments = remove_single_judgment_units(judgments)
274
275 1
    judgments['started'] = judgments['started'].apply(lambda x: dateparser.parse(str(x)))
276 1
    judgments['submitted'] = judgments['submitted'].apply(lambda x: dateparser.parse(str(x)))
277 1
    judgments['duration'] = judgments.apply(lambda row: (row['submitted'] - row['started']).seconds,
278
                                            axis=1)
279
280
    #
281
    # aggregate units
282
    #
283 1
    units = Unit.aggregate(judgments, config)
284
285
    #
286
    # aggregate annotations
287
    # i.e. output columns
288
    #
289 1
    annotations, judgments = aggregate_annotations(config, judgments)
290
291
    #
292
    # aggregate workers
293
    #
294 1
    workers = Worker.aggregate(judgments, config)
295
296
    #
297
    # aggregate job
298
    #
299 1
    job = Job.aggregate(units, judgments, config)
300
301
    # Clean up judgments
302
    # remove input columns from judgments
303 1
    output_cols = [col for col in judgments.columns.values \
304
                    if col.startswith('output') or col.startswith('metric')]
305 1
    judgments = judgments[output_cols + list(platform.values()) + ['duration', 'job']]
306
307
    # set judgment id as index
308 1
    judgments.set_index('judgment', inplace=True)
309
310
    # add missing vector values if closed task
311 1
    units = add_missing_values(config, units)
312
313 1
    return {
314
        'jobs' : job,
315
        'units' : units,
316
        'workers' : workers,
317
        'judgments' : judgments,
318
        'annotations' : annotations,
319
        }, config
320