GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.

crowdtruth.load.get_file_list()   B
last analyzed

Complexity

Conditions 6

Size

Total Lines 17
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 7.7305

Importance

Changes 0
Metric Value
eloc 11
dl 0
loc 17
ccs 7
cts 11
cp 0.6364
rs 8.6666
c 0
b 0
f 0
cc 6
nop 1
crap 7.7305
1
#pylint: disable=W0223
2
3 1
"""
4
Module used to process and load the input files to be evaluated with the CrowdTruth metrics.
5
"""
6
7 1
import os
8
9 1
import logging
10 1
import datetime
11 1
from collections import Counter, OrderedDict
12 1
import dateparser
13
14 1
import pandas as pd
15
16 1
from crowdtruth.models.worker import Worker
17 1
from crowdtruth.models.unit import Unit
18 1
from crowdtruth.models.job import Job
19 1
from crowdtruth.configuration import DefaultConfig
20 1
from crowdtruth.crowd_platform import *
21
22
23
24
25
# create an ordered counter so that we can maintain
26
# the position of tags in the order they were annotated
27 1
class OrderedCounter(Counter, OrderedDict):
28
    """ Instantiates an ordered counter. """
29 1
    pass
30
31 1
def create_ordered_counter(ordered_counter, annotation_vector):
32
    """ Instantiates an ordered counter from a given annotation vector. """
33 1
    for relation in annotation_vector:
34 1
        if relation not in ordered_counter:
35 1
            ordered_counter.update({relation: 0})
36 1
    return ordered_counter
37
38 1
def validate_timestamp_field(date_string, date_format):
39
    """ Validates the time columns (started time and submitted time) in input files. """
40
41
    try:
42
        date_obj = datetime.datetime.strptime(date_string, date_format)
43
        print(date_obj)
44
    except ValueError:
45
        raise ValueError('Incorrect date format')
46
47 1
def get_file_list(directory):
48
    """ List the files in the directry given as argument. """
49 1
    filelist = []
50
51
    # go through all files in this folder
52 1
    for file in os.listdir(directory):
53
        # if it is a folder scan it
54 1
        if os.path.isdir(directory+'/'+file):
55
            sublist = get_file_list(directory+'/'+file)
56
            sublist_length = len(sublist)
57
            if sublist_length:
58
                filelist.append(sublist)
59
60
        # if it is a csv file open it
61 1
        if file.endswith('.csv') and file != 'groundtruth.csv':
62 1
            filelist.append(file)
63 1
    return filelist
64
65 1
def list_files(kwargs, results, config):
66
    """ Creates a list of files to be processed. """
67 1
    files = []
68 1
    directory = ""
69 1
    if 'data_frame' in kwargs:
70 1
        res, config = process_file(kwargs['data_frame'], config)
71 1
        for value in res:
72 1
            results[value].append(res[value])
73 1
        return results
74 1
    elif 'file' in kwargs and kwargs['file'].endswith('.csv'):
75 1
        files = [kwargs['file']]
76 1
    elif 'directory' in kwargs:
77 1
        directory = kwargs['directory']
78 1
        files = get_file_list(directory)
79 1
        logging.info('Found ' + str(len(files)) + ' files')
80
    else:
81
        raise ValueError('No input was provided')
82
83 1
    for file in files:
84 1
        if 'directory' in locals() and directory != "":
85 1
            logging.info("Processing " + file)
86 1
            file = directory + "/" + file
87 1
        if  config.csv_file_separator:
88 1
            judgments = pd.read_csv(file, sep=config.csv_file_separator)#, encoding=result['encoding'])
89
        else:
90
            judgments = pd.read_csv(file)#, encoding=result['encoding'])
91 1
        res, config = process_file(judgments, config, filename=file)
92 1
        for value in res:
93 1
            results[value].append(res[value])
94
95 1
    return results
96
97 1
def load(**kwargs):
98
    """ Loads the input files. """
99
100
    # placeholder for aggregated results
101 1
    results = {
102
        'jobs' : [],
103
        'units' : [],
104
        'workers' : [],
105
        'judgments' : [],
106
        'annotations' : []
107
        }
108
109 1
    if 'config' not in kwargs:
110
        config = DefaultConfig()
111
    else:
112 1
        logging.info('Config loaded')
113 1
        config = kwargs['config']
114
115 1
    results = list_files(kwargs, results, config)
116 1
    for value in results:
117 1
        results[value] = pd.concat(results[value])
118
119
120
    # workers and annotations can appear across jobs, so we have to aggregate those extra
121 1
    results['workers'] = results['workers'].groupby(results['workers'].index).agg({
122
        'unit' : 'sum',
123
        'judgment' : 'sum',
124
        'job' : 'count',
125
        'duration' : 'mean'
126
        })
127
128
    # aggregate annotations
129 1
    results['annotations'] = results['annotations'].groupby(results['annotations'].index).sum()
130
131 1
    return results, config
132
133 1
def remove_empty_rows(config, judgments):
134
    """ handle rows where the worker did not give an answer (AMT issue) """
135
136
    # if config keeps empty rows, add NONE placehoder token
137 1
    if not config.remove_empty_rows:
138 1
        for col in config.outputColumns:
139 1
            for idx in range(len(judgments[col])):
140 1
                if (pd.isnull(judgments[col][idx]) or
141
                        judgments[col][idx] is None or
142
                        judgments[col][idx] == '' or
143
                        judgments[col][idx] == 'nan'):
144 1
                    logging.info('judgments[' + str(idx) + '][' + col + '] is None')
145 1
                    judgments.at[idx, col] = config.none_token
146
    # remove empty rows
147
    else:
148 1
        empty_rows = set()
149 1
        for col in config.outputColumns:
150 1
            empty_rows = empty_rows.union(judgments[pd.isnull(judgments[col]) == True].index)
151 1
            empty_rows = empty_rows.union(judgments[judgments[col] == 'nan'].index)
152 1
        for col in config.outputColumns:
153 1
            judgments = judgments[pd.isnull(judgments[col]) == False]
154 1
            judgments = judgments[judgments[col] != 'nan']
155 1
        judgments = judgments.reset_index(drop=True)
156 1
        count_empty_rows = len(empty_rows)
157 1
        if count_empty_rows > 0:
158 1
            if count_empty_rows == 1:
159
                logging.warning(str(count_empty_rows) + " row with incomplete information in "
160
                                "output columns was removed.")
161
            else:
162 1
                logging.warning(str(count_empty_rows) + " rows with incomplete information in "
163
                            "output columns were removed.")
164 1
    return judgments
165
166 1
def remove_single_judgment_units(judgments):
167
    """ remove units with just 1 judgment """
168 1
    units_1work = judgments.groupby('unit').filter(lambda x: len(x) == 1)["unit"]
169 1
    judgments = judgments[~judgments['unit'].isin(units_1work)]
170 1
    judgments = judgments.reset_index(drop=True)
171 1
    no_units_1work = len(units_1work)
172 1
    if no_units_1work > 0:
173
        if no_units_1work == 1:
174
            logging.warning(str(no_units_1work) + " Media Unit that was annotated by only"
175
                            " 1 Worker was omitted, since agreement cannot be calculated.")
176
        else:
177
            logging.warning(str(no_units_1work) + " Media Units that were annotated by only"
178
                            " 1 Worker were omitted, since agreement cannot be calculated.")
179 1
    return judgments
180
181 1
def make_output_cols_safe_keys(config, judgments):
182
    """ make output values safe keys """
183 1
    for col in config.output.values():
184 1
        if isinstance(judgments[col].iloc[0], dict):
185
            logging.info("Values stored as dictionary")
186
            if config.open_ended_task:
187
                judgments[col] = judgments[col].apply(lambda x: OrderedCounter(x))
188
            else:
189
                judgments[col] = judgments[col].apply(lambda x: create_ordered_counter( \
190
                                 OrderedCounter(x), config.annotation_vector))
191
        else:
192 1
            logging.info("Values not stored as dictionary")
193 1
            if config.open_ended_task:
194 1
                judgments[col] = judgments[col].apply(lambda x: OrderedCounter( \
195
                                                      x.split(config.annotation_separator)))
196
            else:
197 1
                judgments[col] = judgments[col].apply(lambda x: create_ordered_counter( \
198
                                 OrderedCounter(x.split(config.annotation_separator)), \
199
                                 config.annotation_vector))
200 1
    return judgments
201
202
203 1
def add_missing_values(config, units):
204
    """ Adds missing vector values if is a closed task """
205 1
    for col in config.output.values():
206 1
        try:
207
            # openended = config.open_ended_task
208 1
            for idx in list(units.index):
209 1
                for relation in config.annotation_vector:
210 1
                    if relation not in units[col][idx]:
211 1
                        units[col][idx].update({relation : 0})
212 1
            return units
213
        except AttributeError:
214
            continue
215 1
def aggregate_annotations(config, judgments):
216
    """ Aggregates annotations and adds judgments stats. """
217 1
    annotations = pd.DataFrame()
218 1
    for col in config.output.values():
219 1
        judgments[col+'.count'] = judgments[col].apply(lambda x: sum(x.values()))
220 1
        judgments[col+'.unique'] = judgments[col].apply(lambda x: len(x))
221 1
        res = pd.DataFrame(judgments[col].apply(lambda x: \
222
              pd.Series(list(x.keys())).value_counts()).sum(), columns=[col])
223 1
        annotations = pd.concat([annotations, res], axis=0)
224 1
    return annotations, judgments
225
226 1
def process_file(judgments, config, filename=""):
227
    """ Processes input files with the given configuration """
228
229 1
    platform = get_platform(judgments)
230
231 1
    if platform is False:
232 1
        logging.info("Custom crowdsourcing platform!")
233 1
        no_of_columns = len(config.customPlatformColumns)
234 1
        if no_of_columns != 5:
235
            logging.warning("The following column names are required: judgment id, "
236
                            "unit id, worker id, start time, submit time")
237
            raise ValueError('No custom platform configuration was provided')
238
        else:
239
240 1
            platform = {
241
                #'id'       : 'custom',
242
                config.customPlatformColumns[0] : 'judgment',
243
                config.customPlatformColumns[1] : 'unit',
244
                config.customPlatformColumns[2] : 'worker',
245
                config.customPlatformColumns[3] : 'started',
246
                config.customPlatformColumns[4] : 'submitted'
247
            }
248
249
250
    # we must establish which fields were part of the input data and which are output judgments
251
    # if there is a config, check if there is a definition of which fields to use
252
    #config = []
253
    # else use the default and select them automatically
254 1
    config = get_column_types(judgments, config)
255
256
    # allow customization of the judgments
257 1
    judgments = config.processJudgments(judgments)
258
259
    # handle empty rows
260 1
    judgments = remove_empty_rows(config, judgments)
261
262
    # update the config after the preprocessing of judgments
263 1
    config = get_column_types(judgments, config)
264
265 1
    all_columns = dict(list(config.input.items()) + list(config.output.items()) \
266
                       + list(platform.items()))
267
    # allColumns = dict(config.input.items() | config.output.items() | platform.items())
268 1
    judgments = judgments.rename(columns=all_columns)
269
270
    # remove columns we don't care about
271 1
    judgments = judgments[list(all_columns.values())]
272
273 1
    if filename != "":
274 1
        judgments['job'] = filename.split('.csv')[0]
275
    else:
276 1
        judgments['job'] = "pd.DataFrame"
277
278
    # make output values safe keys
279 1
    judgments = make_output_cols_safe_keys(config, judgments)
280
281
    # remove units with just 1 judgment
282 1
    judgments = remove_single_judgment_units(judgments)
283
284 1
    judgments['started'] = judgments['started'].apply(lambda x: dateparser.parse(str(x)))
285 1
    judgments['submitted'] = judgments['submitted'].apply(lambda x: dateparser.parse(str(x)))
286 1
    judgments['duration'] = judgments.apply(lambda row: (row['submitted'] - row['started']).seconds,
287
                                            axis=1)
288
289
    #
290
    # aggregate units
291
    #
292 1
    units = Unit.aggregate(judgments, config)
293
294
    #
295
    # aggregate annotations
296
    # i.e. output columns
297
    #
298 1
    annotations, judgments = aggregate_annotations(config, judgments)
299
300
    #
301
    # aggregate workers
302
    #
303 1
    workers = Worker.aggregate(judgments, config)
304
305
    #
306
    # aggregate job
307
    #
308 1
    job = Job.aggregate(units, judgments, config)
309
310
    # Clean up judgments
311
    # remove input columns from judgments
312 1
    output_cols = [col for col in judgments.columns.values \
313
                    if col.startswith('output') or col.startswith('metric')]
314 1
    judgments = judgments[output_cols + list(platform.values()) + ['duration', 'job']]
315
316
    # set judgment id as index
317 1
    judgments.set_index('judgment', inplace=True)
318
319
    # add missing vector values if closed task
320 1
    units = add_missing_values(config, units)
321
322 1
    return {
323
        'jobs' : job,
324
        'units' : units,
325
        'workers' : workers,
326
        'judgments' : judgments,
327
        'annotations' : annotations,
328
        }, config
329